|
5 | 5 | use React\Promise\PromiseInterface; |
6 | 6 | use Clue\JsonStream\StreamingJsonParser; |
7 | 7 | use React\Promise\Deferred; |
| 8 | +use React\Stream\ReadableStream; |
| 9 | +use React\Stream\ReadableStreamInterface; |
| 10 | +use RuntimeException; |
| 11 | +use React\Promise\CancellablePromiseInterface; |
8 | 12 |
|
9 | 13 | class StreamingParser |
10 | 14 | { |
11 | | - public function parseResponse(PromiseInterface $promise) |
| 15 | + public function parseJsonStream(PromiseInterface $promise) |
12 | 16 | { |
| 17 | + // TODO: assert expect tcp stream |
| 18 | + |
13 | 19 | $parser = new StreamingJsonParser(); |
14 | 20 |
|
15 | | - $deferred = new Deferred(); |
| 21 | + $out = new ReadableStream(); |
| 22 | + |
| 23 | + // try to cancel promise once the stream closes |
| 24 | + if ($promise instanceof CancellablePromiseInterface) { |
| 25 | + $out->on('close', function() use ($promise) { |
| 26 | + $promise->cancel(); |
| 27 | + }); |
| 28 | + } |
| 29 | + |
| 30 | + $promise->then( |
| 31 | + function ($response) use ($out) { |
| 32 | + $out->close(); |
| 33 | + }, |
| 34 | + function ($error) use ($out) { |
| 35 | + $out->emit('error', array($error, $out)); |
| 36 | + $out->close(); |
| 37 | + }, |
| 38 | + function ($progress) use ($parser, $out) { |
| 39 | + if (is_array($progress) && isset($progress['responseStream'])) { |
| 40 | + $stream = $progress['responseStream']; |
| 41 | + /* @var $stream React\Stream\Stream */ |
16 | 42 |
|
17 | | - $promise->then(null, null, function ($progress) use ($parser, $deferred) { |
18 | | - if (is_array($progress) && isset($progress['response'])) { |
19 | | - $stream = $progress['response']; |
20 | | - /* @var $stream React\Stream\Stream */ |
| 43 | + // hack to do not buffer stream contents in body |
| 44 | + $stream->removeAllListeners('data'); |
21 | 45 |
|
22 | | - // got a streaming HTTP reponse => forward each data chunk to the streaming JSON parser |
23 | | - $stream->on('data', function ($data) use ($parser, $deferred) { |
24 | | - $objects = $parser->push($data); |
| 46 | + // got a streaming HTTP reponse => forward each data chunk to the streaming JSON parser |
| 47 | + $stream->on('data', function ($data) use ($parser, $out) { |
| 48 | + $objects = $parser->push($data); |
25 | 49 |
|
26 | | - foreach ($objects as $object) { |
27 | | - $deferred->progress($object); |
28 | | - } |
29 | | - }); |
| 50 | + foreach ($objects as $object) { |
| 51 | + $out->emit('progress', array($object, $out)); |
| 52 | + } |
| 53 | + }); |
| 54 | + } |
30 | 55 | } |
| 56 | + ); |
| 57 | + |
| 58 | + return $out; |
| 59 | + } |
| 60 | + |
| 61 | + public function deferredStream(ReadableStreamInterface $stream, $progressEventName) |
| 62 | + { |
| 63 | + // cancelling the deferred will (try to) close the stream |
| 64 | + $deferred = new Deferred(function () use ($stream) { |
| 65 | + $stream->close(); |
| 66 | + |
| 67 | + throw new RuntimeException('Cancelled'); |
31 | 68 | }); |
32 | 69 |
|
33 | | - $promise->then(array($deferred, 'resolve'), array($deferred, 'reject')); |
| 70 | + if ($stream->isReadable()) { |
| 71 | + // buffer all data events and emit as progress |
| 72 | + $buffered = array(); |
| 73 | + $stream->on($progressEventName, function ($data) use ($deferred, &$buffered) { |
| 74 | + $buffered []= $data; |
| 75 | + $deferred->progress($data); |
| 76 | + }); |
| 77 | + |
| 78 | + // error event rejects |
| 79 | + $stream->on('error', function ($error) use ($deferred) { |
| 80 | + $deferred->reject($error); |
| 81 | + }); |
| 82 | + |
| 83 | + // close event resolves with buffered events (unless already error'ed) |
| 84 | + $stream->on('close', function () use ($deferred, &$buffered) { |
| 85 | + $deferred->resolve($buffered); |
| 86 | + }); |
| 87 | + } else { |
| 88 | + $deferred->reject(new RuntimeException('Stream already ended, looks like it could not be opened')); |
| 89 | + } |
34 | 90 |
|
35 | 91 | return $deferred->promise(); |
36 | 92 | } |
|
0 commit comments