Skip to content

Commit 5dfbf6f

Browse files
committed
Add StreamingParser::parsePlainStream() helper
1 parent 5c064e6 commit 5dfbf6f

2 files changed

Lines changed: 163 additions & 9 deletions

File tree

src/Io/StreamingParser.php

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,44 @@ class StreamingParser
1414
{
1515
public function parseJsonStream(PromiseInterface $promise)
1616
{
17-
// TODO: assert expect tcp stream
17+
// application/json
1818

19+
$in = $this->parsePlainStream($promise);
20+
$out = new ReadableStream();
21+
22+
// invalid/closing input stream => return closed output stream
23+
if (!$in->isReadable()) {
24+
$out->close();
25+
26+
return $out;
27+
}
28+
29+
// forward each data chunk to the streaming JSON parser
1930
$parser = new StreamingJsonParser();
31+
$in->on('data', function ($data) use ($parser, $out) {
32+
$objects = $parser->push($data);
33+
34+
foreach ($objects as $object) {
35+
$out->emit('progress', array($object, $out));
36+
}
37+
});
38+
39+
// forward error and make sure stream closes
40+
$in->on('error', function ($error) use ($out) {
41+
$out->emit('error', array($error, $out));
42+
$out->close();
43+
});
44+
45+
// closing either stream closes the other one
46+
$in->on('close', array($out, 'close'));
47+
$out->on('close', array($in, 'close'));
48+
49+
return $out;
50+
}
51+
52+
public function parsePlainStream(PromiseInterface $promise)
53+
{
54+
// text/plain
2055

2156
$out = new ReadableStream();
2257

@@ -35,21 +70,17 @@ function ($error) use ($out) {
3570
$out->emit('error', array($error, $out));
3671
$out->close();
3772
},
38-
function ($progress) use ($parser, $out) {
73+
function ($progress) use ($out) {
3974
if (is_array($progress) && isset($progress['responseStream'])) {
4075
$stream = $progress['responseStream'];
4176
/* @var $stream React\Stream\Stream */
4277

4378
// hack to do not buffer stream contents in body
4479
$stream->removeAllListeners('data');
4580

46-
// got a streaming HTTP reponse => forward each data chunk to the streaming JSON parser
47-
$stream->on('data', function ($data) use ($parser, $out) {
48-
$objects = $parser->push($data);
49-
50-
foreach ($objects as $object) {
51-
$out->emit('progress', array($object, $out));
52-
}
81+
// got a streaming HTTP response => forward each data chunk to the resulting output stream
82+
$stream->on('data', function ($data) use ($out) {
83+
$out->emit('data', array($data, $out));
5384
});
5485
}
5586
}

tests/Io/StreamingParserTest.php

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
<?php
2+
3+
use Clue\React\Docker\Io\StreamingParser;
4+
use React\Promise\Deferred;
5+
use React\Stream\ReadableStream;
6+
use React\Promise\CancellablePromiseInterface;
7+
8+
class StreamingParserTest extends TestCase
9+
{
10+
private $parser;
11+
12+
public function setUp()
13+
{
14+
$this->parser = new StreamingParser();
15+
}
16+
17+
public function testJsonPassingRejectedPromiseResolvesWithClosedStream()
18+
{
19+
$stream = $this->parser->parseJsonStream($this->createPromiseRejected());
20+
21+
$this->assertInstanceOf('React\Stream\ReadableStreamInterface', $stream);
22+
$this->assertFalse($stream->isReadable());
23+
}
24+
25+
public function testJsonRejectingPromiseWillEmitErrorAndCloseEvent()
26+
{
27+
$deferred = new Deferred();
28+
29+
$stream = $this->parser->parseJsonStream($deferred->promise());
30+
31+
$this->assertTrue($stream->isReadable());
32+
33+
$exception = new RuntimeException();
34+
35+
$stream->on('error', $this->expectCallableOnceWith($exception));
36+
$stream->on('close', $this->expectCallableOnce());
37+
38+
$deferred->reject($exception);
39+
40+
$this->assertFalse($stream->isReadable());
41+
}
42+
43+
public function testJsonResolvingPromiseWillEmitCloseEvent()
44+
{
45+
$deferred = new Deferred();
46+
47+
$stream = $this->parser->parseJsonStream($deferred->promise());
48+
49+
$this->assertTrue($stream->isReadable());
50+
51+
$stream->on('error', $this->expectCallableNever());
52+
$stream->on('close', $this->expectCallableOnce());
53+
54+
$deferred->resolve('data');
55+
56+
$this->assertFalse($stream->isReadable());
57+
}
58+
59+
public function testPlainPassingRejectedPromiseResolvesWithClosedStream()
60+
{
61+
$stream = $this->parser->parsePlainStream($this->createPromiseRejected());
62+
63+
$this->assertInstanceOf('React\Stream\ReadableStreamInterface', $stream);
64+
$this->assertFalse($stream->isReadable());
65+
}
66+
67+
public function testDeferredClosedStreamWillReject()
68+
{
69+
$stream = $this->getMock('React\Stream\ReadableStreamInterface');
70+
$stream->expects($this->once())->method('isReadable')->will($this->returnValue(false));
71+
72+
$promise = $this->parser->deferredStream($stream, 'anything');
73+
$this->expectPromiseReject($promise);
74+
}
75+
76+
public function testDeferredStreamEventsWillBeEmittedAndBuffered()
77+
{
78+
$stream = new ReadableStream();
79+
80+
$promise = $this->parser->deferredStream($stream, 'demo');
81+
82+
$stream->emit('ignored', array('ignored'));
83+
$stream->emit('demo', array('a'));
84+
$stream->emit('demo', array('b'));
85+
86+
$stream->close();
87+
88+
$this->expectPromiseResolveWith(array('a', 'b'), $promise);
89+
}
90+
91+
public function testDeferredStreamErrorEventWillRejectPromise()
92+
{
93+
$stream = new ReadableStream();
94+
95+
$promise = $this->parser->deferredStream($stream, 'demo');
96+
97+
$stream->emit('ignored', array('ignored'));
98+
99+
$stream->emit('demo', array('a'));
100+
101+
$stream->emit('error', array('value', 'ignord'));
102+
103+
$stream->close();
104+
105+
$this->expectPromiseReject($promise);
106+
$promise->then(null, $this->expectCallableOnceWith('value'));
107+
}
108+
109+
public function testDeferredCancelingPromiseWillCloseStream()
110+
{
111+
$this->markTestIncomplete();
112+
113+
$stream = $this->getMock('React\Stream\ReadableStreamInterface');
114+
115+
$promise = $this->parser->deferredStream($stream, 'anything');
116+
if (!($promise instanceof CancellablePromiseInterface)) {
117+
$this->markTestSkipped('Requires Promise v2 API and has no effect on v1 API');
118+
}
119+
120+
$stream->expects($this->once())->method('close');
121+
$promise->cancel();
122+
}
123+
}

0 commit comments

Comments
 (0)