Skip to content

Commit 3bcf964

Browse files
committed
Strict stream semantics, support backpressure and improve error handling
1 parent 30c1353 commit 3bcf964

8 files changed

Lines changed: 326 additions & 94 deletions

README.md

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ its event-driven model to react to changes and events happening.
3434
* [Command streaming](#command-streaming)
3535
* [TAR streaming](#tar-streaming)
3636
* [JSON streaming](#json-streaming)
37-
* [JsonProgressException](#jsonprogressexception)
3837
* [Install](#install)
3938
* [Tests](#tests)
4039
* [License](#license)
@@ -295,17 +294,13 @@ progress events once the stream ends:
295294

296295
```php
297296
$client->imageCreate('clue/streamripper')->then(
298-
function ($data) {
297+
function (array $data) {
299298
// $data is an array of *all* elements in the JSON stream
299+
var_dump($data);
300300
},
301-
function ($error) {
301+
function (Exception $error) {
302302
// an error occurred (possibly after receiving *some* elements)
303-
304-
if ($error instanceof Io\JsonProgressException) {
305-
// a progress message (usually the last) contains an error message
306-
} else {
307-
// any other error, like invalid request etc.
308-
}
303+
echo 'Error: ' . $error->getMessage() . PHP_EOL;
309304
}
310305
);
311306
```
@@ -332,13 +327,13 @@ The resulting stream will emit the following events:
332327

333328
* `data`: for *each* element in the update stream
334329
* `error`: once if an error occurs, will close() stream then
335-
* Will emit an [`Io\JsonProgressException`](#jsonprogressexception) if an individual progress message contains an error message
336-
* Any other `Exception` in case of an transport error, like invalid request etc.
330+
* Will emit a `RuntimeException` if an individual progress message contains an error message
331+
or any other `Exception` in case of an transport error, like invalid request etc.
337332
* `close`: once the stream ends (either finished or after "error")
338333

339334
```php
340335
$stream = $client->imageCreateStream('clue/redis-benchmark');
341-
$stream->on('data', function ($data) {
336+
$stream->on('data', function (array $data) {
342337
// data will be emitted for *each* complete element in the JSON stream
343338
echo $data['status'] . PHP_EOL;
344339
});
@@ -350,13 +345,6 @@ $stream->on('close', function () {
350345

351346
See also the [pull example](examples/pull.php) and the [push example](examples/push.php).
352347

353-
### JsonProgressException
354-
355-
The `Io\JsonProgressException` will be thrown by [JSON streaming](#json-streaming)
356-
endpoints if an individual progress message contains an error message.
357-
358-
The `getData()` method can be used to obtain the progress message.
359-
360348
## Install
361349

362350
The recommended way to install this library is [through Composer](https://getcomposer.org).

src/Io/JsonProgressException.php

Lines changed: 0 additions & 27 deletions
This file was deleted.

src/Io/ReadableDemultiplexStream.php

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
namespace Clue\React\Docker\Io;
44

5-
use React\Stream\ReadableStreamInterface;
65
use Evenement\EventEmitter;
7-
use React\Stream\WritableStreamInterface;
6+
use React\Stream\ReadableStreamInterface;
87
use React\Stream\Util;
8+
use React\Stream\WritableStreamInterface;
9+
910
/**
1011
* Parser for Docker's own frame format used for bidrectional frames
1112
*
@@ -48,7 +49,7 @@ public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent =
4849

4950
// buffer must be empty on end, otherwise this is an error situation
5051
if ($buffer === '') {
51-
$out->emit('end', array());
52+
$out->emit('end');
5253
} else {
5354
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk')));
5455
}
@@ -62,7 +63,7 @@ public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent =
6263
});
6364

6465
// forward close event to output
65-
$multiplexed->on('close', function ($error) use ($out) {
66+
$multiplexed->on('close', function () use ($out) {
6667
$out->close();
6768
});
6869
}
@@ -130,7 +131,9 @@ public function close()
130131

131132
// closing output stream closes input stream
132133
$this->multiplexed->close();
134+
$this->buffer = '';
133135

134-
$this->emit('close', array());
136+
$this->emit('close');
137+
$this->removeAllListeners();
135138
}
136139
}

src/Io/ReadableJsonStream.php

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
<?php
2+
3+
namespace Clue\React\Docker\Io;
4+
5+
use Clue\JsonStream\StreamingJsonParser;
6+
use React\Stream\ReadableStreamInterface;
7+
use React\Stream\Util;
8+
use React\Stream\WritableStreamInterface;
9+
use Evenement\EventEmitter;
10+
11+
/**
12+
* Parser for Docker's JSON stream format used for log messages etc.
13+
*
14+
* @internal
15+
*/
16+
class ReadableJsonStream extends EventEmitter implements ReadableStreamInterface
17+
{
18+
private $closed = false;
19+
private $input;
20+
private $parser;
21+
22+
public function __construct(ReadableStreamInterface $input)
23+
{
24+
$this->input = $input;
25+
$this->parser = $parser = new StreamingJsonParser();
26+
if (!$input->isReadable()) {
27+
$this->close();
28+
return;
29+
}
30+
31+
// pass all input data chunks through the parser
32+
$input->on('data', array($this, 'handleData'));
33+
34+
// forward end event to output
35+
$out = $this;
36+
$closed =& $this->closed;
37+
$input->on('end', function () use ($out, $parser, &$closed) {
38+
// ignore duplicate end events
39+
if ($closed) {
40+
return;
41+
}
42+
43+
if ($parser->isEmpty()) {
44+
$out->emit('end');
45+
} else {
46+
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete JSON data')));
47+
}
48+
$out->close();
49+
});
50+
51+
// forward error event to output
52+
$input->on('error', function ($error) use ($out) {
53+
$out->emit('error', array($error));
54+
$out->close();
55+
});
56+
57+
// forward close event to output
58+
$input->on('close', function () use ($out) {
59+
$out->close();
60+
});
61+
}
62+
63+
/**
64+
* push the given stream chunk into the parser buffer and try to extract all JSON messages
65+
*
66+
* @internal
67+
* @param string $data
68+
*/
69+
public function handleData($data)
70+
{
71+
// forward each data chunk to the streaming JSON parser
72+
try {
73+
$objects = $this->parser->push($data);
74+
} catch (\Exception $e) {
75+
$this->emit('error', array($e));
76+
$this->close();
77+
return;
78+
}
79+
80+
foreach ($objects as $object) {
81+
// stop emitting data if stream is already closed
82+
if ($this->closed) {
83+
return;
84+
}
85+
86+
if (isset($object['error'])) {
87+
$this->emit('error', array(new \RuntimeException($object['error'])));
88+
$this->close();
89+
return;
90+
}
91+
$this->emit('data', array($object));
92+
}
93+
}
94+
95+
public function pause()
96+
{
97+
$this->input->pause();
98+
}
99+
100+
public function resume()
101+
{
102+
$this->input->resume();
103+
}
104+
105+
public function isReadable()
106+
{
107+
return $this->input->isReadable();
108+
}
109+
110+
public function pipe(WritableStreamInterface $dest, array $options = array())
111+
{
112+
return Util::pipe($this, $dest, $options);
113+
}
114+
115+
public function close()
116+
{
117+
if ($this->closed) {
118+
return;
119+
}
120+
121+
$this->closed = true;
122+
123+
// closing output stream closes input stream
124+
$this->input->close();
125+
126+
$this->emit('close');
127+
$this->removeAllListeners();
128+
}
129+
}

src/Io/StreamingParser.php

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,12 @@
22

33
namespace Clue\React\Docker\Io;
44

5+
use Clue\React\Promise\Stream;
6+
use Psr\Http\Message\ResponseInterface;
57
use React\Promise\PromiseInterface;
6-
use Clue\JsonStream\StreamingJsonParser;
78
use React\Promise\Deferred;
8-
use React\Stream\ReadableStream;
99
use React\Stream\ReadableStreamInterface;
1010
use RuntimeException;
11-
use React\Promise\CancellablePromiseInterface;
12-
use Clue\React\Promise\Stream;
13-
use Psr\Http\Message\ResponseInterface;
1411

1512
/**
1613
* StreamingParser is a simple helper to work with the streaming body of HTTP response objects
@@ -31,42 +28,7 @@ public function parseJsonStream(PromiseInterface $promise)
3128
{
3229
// application/json
3330

34-
$in = $this->parsePlainStream($promise);
35-
$out = new ReadableStream();
36-
37-
// invalid/closing input stream => return closed output stream
38-
if (!$in->isReadable()) {
39-
$out->close();
40-
41-
return $out;
42-
}
43-
44-
// forward each data chunk to the streaming JSON parser
45-
$parser = new StreamingJsonParser();
46-
$in->on('data', function ($data) use ($parser, $out) {
47-
$objects = $parser->push($data);
48-
49-
foreach ($objects as $object) {
50-
if (isset($object['error'])) {
51-
$out->emit('error', array(new JsonProgressException($object), $out));
52-
$out->close();
53-
return;
54-
}
55-
$out->emit('data', array($object, $out));
56-
}
57-
});
58-
59-
// forward error and make sure stream closes
60-
$in->on('error', function ($error) use ($out) {
61-
$out->emit('error', array($error, $out));
62-
$out->close();
63-
});
64-
65-
// closing either stream closes the other one
66-
$in->on('close', array($out, 'close'));
67-
$out->on('close', array($in, 'close'));
68-
69-
return $out;
31+
return new ReadableJsonStream($this->parsePlainStream($promise));
7032
}
7133

7234
/**

tests/FunctionalClientTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,8 @@ public function testImageCreateStreamMissingWillEmitJsonError()
386386
$old || $stream->on('progress', $this->expectCallableNever());
387387
$stream->on('data', $this->expectCallableNever());
388388

389-
// will emit "error" with JsonProgressException and close
390-
$old && $stream->on('error', $this->expectCallableOnceParameter('Clue\React\Docker\Io\JsonProgressException'));
389+
// will emit "error" with RuntimeException and close
390+
$old && $stream->on('error', $this->expectCallableOnceParameter('RuntimeException'));
391391
$old || $stream->on('error', $this->expectCallableOnceParameter('Clue\React\Buzz\Message\ResponseException'));
392392
$stream->on('close', $this->expectCallableOnce());
393393

tests/Io/ReadableDemultiplexStreamTest.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,28 @@ public function testStreamWillEmitDataOnCompleteFrameChunked()
8282
$this->stream->emit('data', array("st"));
8383
}
8484

85+
public function testCloseFromDataEventWillStopEmittingFurtherDataEvents()
86+
{
87+
$parser = $this->parser;
88+
$this->parser->on('data', function () use ($parser) {
89+
$parser->close();
90+
});
91+
92+
$this->parser->on('data', $this->expectCallableOnceWith('a'));
93+
94+
$this->stream->emit('data', array("\x01\x00\x00\x00" . "\x00\x00\x00\x01" . "a" . "\x01\x00\x00\x00" . "\x00\x00\x00\x01" . "b"));
95+
}
96+
97+
public function testCloseTwiceWillEmitCloseOnceAndRemoveAllListeners()
98+
{
99+
$this->parser->on('close', $this->expectCallableOnce());
100+
101+
$this->parser->close();
102+
$this->parser->close();
103+
104+
$this->assertEquals(array(), $this->parser->listeners('close'));
105+
}
106+
85107
public function testPipeWillBeForwardedToTargetStream()
86108
{
87109
$target = new WritableStream();

0 commit comments

Comments
 (0)