Skip to content

Commit b284cca

Browse files
committed
Add StreamingParser
1 parent 061d2e9 commit b284cca

2 files changed

Lines changed: 39 additions & 1 deletion

File tree

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"php": ">=5.3",
1818
"react/event-loop": "~0.3.0|~0.4.0",
1919
"clue/buzz-react": "~0.2.0",
20-
"react/promise": "~1.0|~2.0"
20+
"react/promise": "~1.0|~2.0",
21+
"clue/json-stream": "~0.1.0"
2122
}
2223
}

src/Io/StreamingParser.php

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
namespace Clue\React\Docker\Io;
4+
5+
use React\Promise\PromiseInterface;
6+
use Clue\JsonStream\StreamingJsonParser;
7+
use React\Promise\Deferred;
8+
9+
class StreamingParser
10+
{
11+
public function parseResponse(PromiseInterface $promise)
12+
{
13+
$parser = new StreamingJsonParser();
14+
15+
$deferred = new Deferred();
16+
17+
$promise->then(null, null, function ($progress) use ($parser, $deferred) {
18+
if (is_array($progress) && isset($progress['response'])) {
19+
$stream = $progress['response'];
20+
/* @var $stream React\Stream\Stream */
21+
22+
// got a streaming HTTP reponse => forward each data chunk to the streaming JSON parser
23+
$stream->on('data', function ($data) use ($parser, $deferred) {
24+
$objects = $parser->push($data);
25+
26+
foreach ($objects as $object) {
27+
$deferred->progress($object);
28+
}
29+
});
30+
}
31+
});
32+
33+
$promise->then(array($deferred, 'resolve'), array($deferred, 'reject'));
34+
35+
return $deferred->promise();
36+
}
37+
}

0 commit comments

Comments
 (0)