Skip to content

Commit 9044dbf

Browse files
authored
Merge pull request #1 from tilfin/feature/work-promises
Support workPromise returns Promise array
2 parents d779af9 + c1aa7c8 commit 9044dbf

4 files changed

Lines changed: 129 additions & 20 deletions

File tree

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Node.js transform stream working at constant pace and concurrent for object mode
1212
* Specify work time at once (opts.workMS)
1313
* Specify concurrent workers (opts.concurrency)
1414
* Fire `done` event after when all workers have finished asynchrous -processes
15-
* Counting tag system to call `this.countTag(_tag_)` in `_workPromise`, you can get summarized results `tagCounts` grouped by tag.
15+
* Counting tag system to call `this.countTag(<tag>)` in `_workPromise`, you can get summarized results `tagCounts` grouped by tag.
1616
* Node.js 4.3 or later
1717

1818
## Targets
@@ -28,6 +28,17 @@ $ npm install -save paced-work-stream
2828

2929
## How to Use
3030

31+
### Creating a PacedWorkStream
32+
33+
```
34+
new PacedWorkStream(opts, workPromise);
35+
```
36+
37+
* `opts.concurrency` is the number of concurrent processes.
38+
* `opts.workMS` is milliseconds of work time at once that contains process-time and wait-time.
39+
* `opts.highWaterMark` is maximum object buffer size. If you use flow mode, you should set it the number of source items.
40+
* `workPromise` is `function(item)` must returns a promise processing the item.
41+
3142
```javascript
3243
const es = require('event-stream');
3344

lib/main.js

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const util = require('util');
99
*
1010
* @param {Number} opts.concurrency - number of concurrent process, default is 10
1111
* @param {number} opts.workMS - work-time millseconds at once, default is 0 (no wait)
12-
* @param {Function} workPromise - work-function returns a Promise callwed with an item `function (item)`
12+
* @param {Promise|[Promise]} workPromise - work-function returns a Promise callwed with an item `function (item)`
1313
*/
1414
function PacedWorkStream(opts, workPromise) {
1515
if (!(this instanceof PacedWorkStream))
@@ -23,6 +23,7 @@ function PacedWorkStream(opts, workPromise) {
2323

2424
this.tagCounts = {};
2525
this._bufItems = [];
26+
this._bufPrmss = [];
2627
if (workPromise) {
2728
this._workPromise = workPromise;
2829
}
@@ -35,37 +36,64 @@ util.inherits(PacedWorkStream, Transform);
3536
PacedWorkStream.prototype._transform = function(data, encoding, cb) {
3637
this._bufItems.push(data);
3738

39+
if (this._bufPrmss.length >= this._concurrency) {
40+
const doPromises = this._bufPrmss.splice(0, this._concurrency);
41+
this._process(doPromises, cb);
42+
return;
43+
}
44+
3845
if (this._bufItems.length < this._concurrency) {
3946
cb();
4047
return;
4148
}
4249

4350
const items = this._bufItems.splice(0, this._concurrency);
44-
this._process(items, cb);
51+
try {
52+
this._bufPrmss = this._bufPrmss.concat(this._toPromises(items));
53+
const doPromises = this._bufPrmss.splice(0, this._concurrency);
54+
this._process(doPromises, cb);
55+
} catch (err) {
56+
cb(err);
57+
}
4558
}
4659
PacedWorkStream.prototype._flush = function(cb) {
47-
if (this._bufItems.length) {
48-
this._process(this._bufItems, (err) => {
60+
const restItems = this._bufItems;
61+
if (restItems.length) {
62+
try {
63+
this._bufPrmss = this._bufPrmss.concat(this._toPromises(restItems));
64+
} catch (err) {
4965
cb(err);
50-
this.emit('done');
51-
});
52-
this._bufItems = [];
53-
} else {
54-
cb();
55-
this.emit('done');
66+
return;
67+
}
5668
}
69+
70+
this._flushPromises(cb);
5771
}
5872

59-
PacedWorkStream.prototype._process = function(items, cb) {
60-
let promises;
61-
try {
62-
promises = items.map((item) => {
63-
return this._workPromise(item);
64-
});
65-
} catch (err) {
66-
return cb(err);
73+
PacedWorkStream.prototype._flushPromises = function(cb) {
74+
if (this._bufPrmss.length === 0) {
75+
cb();
76+
this.emit('done');
77+
return;
6778
}
6879

80+
const doPromises = this._bufPrmss.splice(0, this._concurrency);
81+
this._process(doPromises, (err) => {
82+
if (err) {
83+
cb(err);
84+
} else {
85+
this._flushPromises(cb);
86+
}
87+
});
88+
}
89+
PacedWorkStream.prototype._toPromises = function(items) {
90+
let promises = [];
91+
items.forEach((item) => {
92+
promises = promises.concat(this._workPromise(item));
93+
});
94+
return promises;
95+
}
96+
PacedWorkStream.prototype._process = function(promises, cb) {
6997
const startTime = new Date().getTime();
7098
Promise.all(promises)
7199
.then((results) => {

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "paced-work-stream",
3-
"version": "0.8.1",
3+
"version": "0.9.0",
44
"description": "Node.js stream working at constant pace and concurrent",
55
"main": "lib/main.js",
66
"scripts": {

test/work_promise.test.js

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
'use strict';
2+
3+
const chai = require('chai');
4+
const assert = chai.assert;
5+
const es = require('event-stream');
6+
const PacedWorkStream = require('../lib/main');
7+
8+
describe('PacedWorkStream', () => {
9+
context('workPromise returns promises greater than 1', () => {
10+
it('raises done event', (done) => {
11+
const pwStream = new PacedWorkStream({
12+
concurrency: 2,
13+
workMS: 0
14+
}, function(item) {
15+
if (item === 21) {
16+
return [
17+
Promise.resolve('21-1').then(() => { this.countTag('workDone') }),
18+
Promise.resolve('21-2').then(() => { this.countTag('workDone') })
19+
];
20+
} else {
21+
return Promise.resolve(item.toString()).then(() => { this.countTag('workDone') });
22+
}
23+
})
24+
.on('done', function() {
25+
assert.deepEqual(this.tagCounts, { workDone: 6 });
26+
done();
27+
}).on('error', (err) => {
28+
assert.ifError(err);
29+
done();
30+
});
31+
32+
const reader = es.readArray([11, 12, 21, 22, 31])
33+
reader.pipe(pwStream);
34+
});
35+
36+
it('raises done event', (done) => {
37+
const pwStream = new PacedWorkStream({
38+
concurrency: 3,
39+
workMS: 0
40+
}, function(item) {
41+
if (item % 10 === 2) {
42+
const prmss = [];
43+
for (let i = 1; i <= 5; i++) {
44+
prmss.push(Promise.resolve(`${item}-${i}`)
45+
.then((r) => { this.countTag('workDone'); return r; }));
46+
}
47+
return prmss;
48+
} else {
49+
return Promise.resolve(item.toString())
50+
.then((r) => { this.countTag('workDone'); return r; });
51+
}
52+
})
53+
.on('done', function() {
54+
assert.deepEqual(this.tagCounts, { workDone: 13 });
55+
done();
56+
}).on('error', (err) => {
57+
assert.ifError(err);
58+
done();
59+
});
60+
61+
const reader = es.readArray([11, 12, 21, 22, 31])
62+
const writer = es.writeArray(function(err, array) {
63+
assert.deepEqual(array, ['11', '12-1', '12-2', '12-3', '12-4', '12-5',
64+
'21', '22-1', '22-2', '22-3', '22-4', '22-5', '31']);
65+
});
66+
67+
reader.pipe(pwStream).pipe(writer);
68+
});
69+
});
70+
});

0 commit comments

Comments
 (0)