|
12 | 12 | * Creates a `Promise` which resolves with the stream data buffer |
13 | 13 | * |
14 | 14 | * @param ReadableStreamInterface $stream |
| 15 | + * @param int|null $maxLength Maximum number of bytes to buffer or null for unlimited. |
15 | 16 | * @return Promise\CancellablePromiseInterface Promise<string, Exception> |
16 | 17 | */ |
17 | | -function buffer(ReadableStreamInterface $stream) |
| 18 | +function buffer(ReadableStreamInterface $stream, $maxLength = null) |
18 | 19 | { |
19 | 20 | // stream already ended => resolve with empty buffer |
20 | 21 | if (!$stream->isReadable()) { |
21 | 22 | return Promise\resolve(''); |
22 | 23 | } |
23 | 24 |
|
| 25 | + $deferred = new Promise\Deferred(); |
24 | 26 | $buffer = ''; |
25 | | - $bufferer = function ($data) use (&$buffer) { |
| 27 | + $bufferer = function ($data) use (&$buffer, $deferred, $maxLength) { |
26 | 28 | $buffer .= $data; |
| 29 | + if ($maxLength !== null && isset($buffer[$maxLength])) { |
| 30 | + $deferred->reject(new \OverflowException('Buffer exceeded maximum length')); |
| 31 | + } |
27 | 32 | }; |
28 | 33 | $stream->on('data', $bufferer); |
29 | 34 |
|
30 | | - $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, &$buffer) { |
| 35 | + $promise = new Promise\Promise(function ($resolve, $reject) use ($stream, $deferred, &$buffer) { |
| 36 | + $deferred->promise()->then($resolve, $reject); |
| 37 | + |
31 | 38 | $stream->on('error', function ($error) use ($reject) { |
32 | 39 | $reject(new \RuntimeException('An error occured on the underlying stream while buffering', 0, $error)); |
33 | 40 | }); |
|
0 commit comments