Skip to content

Commit 605e806

Browse files
authored
Merge pull request #3 from tilfin/feature/null-work
Fix when workPromise doesn't return promises
2 parents c1eca52 + ba8d817 commit 605e806

5 files changed

Lines changed: 65 additions & 7 deletions

File tree

example/normal.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
'use strict';
22

33
const es = require('event-stream');
4+
const devnull = require('dev-null');
45

56
const PacedWorkStream = require('../lib/main');
67

78
const pwStream = new PacedWorkStream({
89
concurrency: 2,
9-
workMS: 1000,
10-
highWaterMark: 5
10+
workMS: 1000
1111
}, function(item) {
1212
console.log(new Date().toISOString(), 'Begin', item);
1313

@@ -26,4 +26,4 @@ const pwStream = new PacedWorkStream({
2626
});
2727

2828
const reader = es.readArray([11, 12, 21, 22, 31])
29-
reader.pipe(pwStream);
29+
reader.pipe(pwStream).pipe(devnull({ objectMode: true }));

example/with_promised_lifestream.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ const PacedWorkStream = require('../lib/main');
77

88
const pacedWorker = new PacedWorkStream({
99
concurrency: 2,
10-
workMS: 1000,
11-
highWaterMark: 5
10+
workMS: 1000
1211
}, function(item) {
1312
console.log(new Date().toISOString(), 'Begin', item);
1413

lib/main.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@ PacedWorkStream.prototype._flushPromises = function(cb) {
8989
PacedWorkStream.prototype._toPromises = function(items) {
9090
let promises = [];
9191
items.forEach((item) => {
92-
promises = promises.concat(this._workPromise(item));
92+
const pr = this._workPromise(item);
93+
if (pr) {
94+
promises = promises.concat(pr);
95+
}
9396
});
9497
return promises;
9598
}

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
"devDependencies": {
2929
"chai": "^3.5.0",
3030
"coveralls": "^2.11.12",
31+
"dev-null": "^0.1.1",
3132
"event-stream": "^3.3.4",
3233
"istanbul": "^0.4.5",
3334
"mocha": "^3.0.2",

test/work_promise.test.js

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
const chai = require('chai');
44
const assert = chai.assert;
55
const es = require('event-stream');
6+
const devnull = require('dev-null');
7+
68
const PacedWorkStream = require('../lib/main');
79

810
describe('PacedWorkStream', () => {
@@ -30,7 +32,7 @@ describe('PacedWorkStream', () => {
3032
});
3133

3234
const reader = es.readArray([11, 12, 21, 22, 31])
33-
reader.pipe(pwStream);
35+
reader.pipe(pwStream).pipe(devnull({ objectMode: true }));
3436
});
3537

3638
it('raises done event', (done) => {
@@ -67,4 +69,57 @@ describe('PacedWorkStream', () => {
6769
reader.pipe(pwStream).pipe(writer);
6870
});
6971
});
72+
73+
context('workPromise returns no promises sometimes', () => {
74+
it('raises done event', (done) => {
75+
const pwStream = new PacedWorkStream({
76+
concurrency: 3,
77+
workMS: 0
78+
}, function(item) {
79+
if (item % 10 === 2) {
80+
return Promise.resolve(item.toString())
81+
.then((r) => { this.countTag('workDone'); return r; });
82+
}
83+
})
84+
.on('done', function() {
85+
assert.deepEqual(this.tagCounts, { workDone: 2 });
86+
done();
87+
}).on('error', (err) => {
88+
assert.ifError(err);
89+
done();
90+
});
91+
92+
const reader = es.readArray([11, 12, 21, 22, 31])
93+
const writer = es.writeArray(function(err, array) {
94+
assert.deepEqual(array, ['12', '22']);
95+
});
96+
97+
reader.pipe(pwStream).pipe(writer);
98+
});
99+
});
100+
101+
context('workPromise does not return promises at all', () => {
102+
it('raises done event', (done) => {
103+
const pwStream = new PacedWorkStream({
104+
concurrency: 3,
105+
workMS: 0
106+
}, function(item) {
107+
return;
108+
})
109+
.on('done', function() {
110+
assert.deepEqual(this.tagCounts, {});
111+
done();
112+
}).on('error', (err) => {
113+
assert.ifError(err);
114+
done();
115+
});
116+
117+
const reader = es.readArray([11, 12, 21, 22, 31])
118+
const writer = es.writeArray(function(err, array) {
119+
assert.deepEqual(array, []);
120+
});
121+
122+
reader.pipe(pwStream).pipe(writer);
123+
});
124+
});
70125
});

0 commit comments

Comments
 (0)