Skip to content

Commit f4a28bf

Browse files
authored
Merge pull request #50 from clue-labs/stream-semantics
Change JSON stream to always report `data` events instead of `progress`, follow strict stream semantics, support backpressure and improve error handling
2 parents b9b09be + 3bcf964 commit f4a28bf

13 files changed

Lines changed: 362 additions & 144 deletions

README.md

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ its event-driven model to react to changes and events happening.
3434
* [Command streaming](#command-streaming)
3535
* [TAR streaming](#tar-streaming)
3636
* [JSON streaming](#json-streaming)
37-
* [JsonProgressException](#jsonprogressexception)
3837
* [Install](#install)
3938
* [Tests](#tests)
4039
* [License](#license)
@@ -295,17 +294,13 @@ progress events once the stream ends:
295294

296295
```php
297296
$client->imageCreate('clue/streamripper')->then(
298-
function ($data) {
297+
function (array $data) {
299298
// $data is an array of *all* elements in the JSON stream
299+
var_dump($data);
300300
},
301-
function ($error) {
301+
function (Exception $error) {
302302
// an error occurred (possibly after receiving *some* elements)
303-
304-
if ($error instanceof Io\JsonProgressException) {
305-
// a progress message (usually the last) contains an error message
306-
} else {
307-
// any other error, like invalid request etc.
308-
}
303+
echo 'Error: ' . $error->getMessage() . PHP_EOL;
309304
}
310305
);
311306
```
@@ -330,18 +325,15 @@ $stream = $client->eventsStream();
330325

331326
The resulting stream will emit the following events:
332327

333-
* `progress`: for *each* element in the update stream
334-
* `error`: once if an error occurs, will close() stream then
335-
* Will emit an [`Io\JsonProgressException`](#jsonprogressexception) if an individual progress message contains an error message
336-
* Any other `Exception` in case of an transport error, like invalid request etc.
337-
* `close`: once the stream ends (either finished or after "error")
338-
339-
Please note that the resulting stream does not emit any "data" events, so
340-
you will not be able to pipe() its events into another `WritableStream`.
328+
* `data`: for *each* element in the update stream
329+
* `error`: once if an error occurs, will close() stream then
330+
* Will emit a `RuntimeException` if an individual progress message contains an error message
331+
or any other `Exception` in case of an transport error, like invalid request etc.
332+
* `close`: once the stream ends (either finished or after "error")
341333

342334
```php
343335
$stream = $client->imageCreateStream('clue/redis-benchmark');
344-
$stream->on('progress', function ($data) {
336+
$stream->on('data', function (array $data) {
345337
// data will be emitted for *each* complete element in the JSON stream
346338
echo $data['status'] . PHP_EOL;
347339
});
@@ -353,13 +345,6 @@ $stream->on('close', function () {
353345

354346
See also the [pull example](examples/pull.php) and the [push example](examples/push.php).
355347

356-
### JsonProgressException
357-
358-
The `Io\JsonProgressException` will be thrown by [JSON streaming](#json-streaming)
359-
endpoints if an individual progress message contains an error message.
360-
361-
The `getData()` method can be used to obtain the progress message.
362-
363348
## Install
364349

365350
The recommended way to install this library is [through Composer](https://getcomposer.org).

examples/events.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
// stream all events for 10 seconds
2121
$stream = $client->eventsStream(null, microtime(true) + 10.0);
2222

23-
$stream->on('progress', function ($event) {
23+
$stream->on('data', function ($event) {
2424
echo json_encode($event) . PHP_EOL;
2525
});
2626

examples/pull.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
$stream = $client->imageCreateStream($image);
1717

18-
$stream->on('progress', function ($progress) {
18+
$stream->on('data', function ($progress) {
1919
echo 'progress: '. json_encode($progress) . PHP_EOL;
2020
});
2121

src/Client.php

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,7 @@ public function version()
127127
public function events($since = null, $until = null, $filters = array())
128128
{
129129
return $this->streamingParser->deferredStream(
130-
$this->eventsStream($since, $until, $filters),
131-
'progress'
130+
$this->eventsStream($since, $until, $filters)
132131
);
133132
}
134133

@@ -138,12 +137,9 @@ public function events($since = null, $until = null, $filters = array())
138137
* This is a JSON streaming API endpoint that returns a stream instance.
139138
*
140139
* The resulting stream will emit the following events:
141-
* - progress: for *each* element in the update stream
142-
* - error: once if an error occurs, will close() stream then
143-
* - close: once the stream ends (either finished or after "error")
144-
*
145-
* Please note that the resulting stream does not emit any "data" events, so
146-
* you will not be able to pipe() its events into another `WritableStream`.
140+
* - data: for *each* element in the update stream
141+
* - error: once if an error occurs, will close() stream then
142+
* - close: once the stream ends (either finished or after "error")
147143
*
148144
* The optional `$filters` parameter can be used to only get events for
149145
* certain event types, images and/or containers etc. like this:
@@ -758,9 +754,9 @@ public function imageList($all = false)
758754
*/
759755
public function imageCreate($fromImage = null, $fromSrc = null, $repo = null, $tag = null, $registry = null, $registryAuth = null)
760756
{
761-
$stream = $this->imageCreateStream($fromImage, $fromSrc, $repo, $tag, $registry, $registryAuth);
762-
763-
return $this->streamingParser->deferredStream($stream, 'progress');
757+
return $this->streamingParser->deferredStream(
758+
$this->imageCreateStream($fromImage, $fromSrc, $repo, $tag, $registry, $registryAuth)
759+
);
764760
}
765761

766762
/**
@@ -769,12 +765,9 @@ public function imageCreate($fromImage = null, $fromSrc = null, $repo = null, $t
769765
* This is a JSON streaming API endpoint that returns a stream instance.
770766
*
771767
* The resulting stream will emit the following events:
772-
* - progress: for *each* element in the update stream
773-
* - error: once if an error occurs, will close() stream then
774-
* - close: once the stream ends (either finished or after "error")
775-
*
776-
* Please note that the resulting stream does not emit any "data" events, so
777-
* you will not be able to pipe() its events into another `WritableStream`.
768+
* - data: for *each* element in the update stream
769+
* - error: once if an error occurs, will close() stream then
770+
* - close: once the stream ends (either finished or after "error").
778771
*
779772
* Pulling a private image from a remote registry will likely require authorization, so make
780773
* sure to pass the $registryAuth parameter, see `self::authHeaders()` for
@@ -871,9 +864,9 @@ public function imageHistory($image)
871864
*/
872865
public function imagePush($image, $tag = null, $registry = null, $registryAuth = null)
873866
{
874-
$stream = $this->imagePushStream($image, $tag, $registry, $registryAuth);
875-
876-
return $this->streamingParser->deferredStream($stream, 'progress');
867+
return $this->streamingParser->deferredStream(
868+
$this->imagePushStream($image, $tag, $registry, $registryAuth)
869+
);
877870
}
878871

879872
/**
@@ -882,12 +875,9 @@ public function imagePush($image, $tag = null, $registry = null, $registryAuth =
882875
* This is a JSON streaming API endpoint that returns a stream instance.
883876
*
884877
* The resulting stream will emit the following events:
885-
* - progress: for *each* element in the update stream
886-
* - error: once if an error occurs, will close() stream then
887-
* - close: once the stream ends (either finished or after "error")
888-
*
889-
* Please note that the resulting stream does not emit any "data" events, so
890-
* you will not be able to pipe() its events into another `WritableStream`.
878+
* - data: for *each* element in the update stream
879+
* - error: once if an error occurs, will close() stream then
880+
* - close: once the stream ends (either finished or after "error")
891881
*
892882
* Pushing to a remote registry will likely require authorization, so make
893883
* sure to pass the $registryAuth parameter, see `self::authHeaders()` for

src/Io/JsonProgressException.php

Lines changed: 0 additions & 27 deletions
This file was deleted.

src/Io/ReadableDemultiplexStream.php

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
namespace Clue\React\Docker\Io;
44

5-
use React\Stream\ReadableStreamInterface;
65
use Evenement\EventEmitter;
7-
use React\Stream\WritableStreamInterface;
6+
use React\Stream\ReadableStreamInterface;
87
use React\Stream\Util;
8+
use React\Stream\WritableStreamInterface;
9+
910
/**
1011
* Parser for Docker's own frame format used for bidrectional frames
1112
*
@@ -48,7 +49,7 @@ public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent =
4849

4950
// buffer must be empty on end, otherwise this is an error situation
5051
if ($buffer === '') {
51-
$out->emit('end', array());
52+
$out->emit('end');
5253
} else {
5354
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk')));
5455
}
@@ -62,7 +63,7 @@ public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent =
6263
});
6364

6465
// forward close event to output
65-
$multiplexed->on('close', function ($error) use ($out) {
66+
$multiplexed->on('close', function () use ($out) {
6667
$out->close();
6768
});
6869
}
@@ -130,7 +131,9 @@ public function close()
130131

131132
// closing output stream closes input stream
132133
$this->multiplexed->close();
134+
$this->buffer = '';
133135

134-
$this->emit('close', array());
136+
$this->emit('close');
137+
$this->removeAllListeners();
135138
}
136139
}

src/Io/ReadableJsonStream.php

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
<?php
2+
3+
namespace Clue\React\Docker\Io;
4+
5+
use Clue\JsonStream\StreamingJsonParser;
6+
use React\Stream\ReadableStreamInterface;
7+
use React\Stream\Util;
8+
use React\Stream\WritableStreamInterface;
9+
use Evenement\EventEmitter;
10+
11+
/**
12+
* Parser for Docker's JSON stream format used for log messages etc.
13+
*
14+
* @internal
15+
*/
16+
class ReadableJsonStream extends EventEmitter implements ReadableStreamInterface
17+
{
18+
private $closed = false;
19+
private $input;
20+
private $parser;
21+
22+
public function __construct(ReadableStreamInterface $input)
23+
{
24+
$this->input = $input;
25+
$this->parser = $parser = new StreamingJsonParser();
26+
if (!$input->isReadable()) {
27+
$this->close();
28+
return;
29+
}
30+
31+
// pass all input data chunks through the parser
32+
$input->on('data', array($this, 'handleData'));
33+
34+
// forward end event to output
35+
$out = $this;
36+
$closed =& $this->closed;
37+
$input->on('end', function () use ($out, $parser, &$closed) {
38+
// ignore duplicate end events
39+
if ($closed) {
40+
return;
41+
}
42+
43+
if ($parser->isEmpty()) {
44+
$out->emit('end');
45+
} else {
46+
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete JSON data')));
47+
}
48+
$out->close();
49+
});
50+
51+
// forward error event to output
52+
$input->on('error', function ($error) use ($out) {
53+
$out->emit('error', array($error));
54+
$out->close();
55+
});
56+
57+
// forward close event to output
58+
$input->on('close', function () use ($out) {
59+
$out->close();
60+
});
61+
}
62+
63+
/**
64+
* push the given stream chunk into the parser buffer and try to extract all JSON messages
65+
*
66+
* @internal
67+
* @param string $data
68+
*/
69+
public function handleData($data)
70+
{
71+
// forward each data chunk to the streaming JSON parser
72+
try {
73+
$objects = $this->parser->push($data);
74+
} catch (\Exception $e) {
75+
$this->emit('error', array($e));
76+
$this->close();
77+
return;
78+
}
79+
80+
foreach ($objects as $object) {
81+
// stop emitting data if stream is already closed
82+
if ($this->closed) {
83+
return;
84+
}
85+
86+
if (isset($object['error'])) {
87+
$this->emit('error', array(new \RuntimeException($object['error'])));
88+
$this->close();
89+
return;
90+
}
91+
$this->emit('data', array($object));
92+
}
93+
}
94+
95+
public function pause()
96+
{
97+
$this->input->pause();
98+
}
99+
100+
public function resume()
101+
{
102+
$this->input->resume();
103+
}
104+
105+
public function isReadable()
106+
{
107+
return $this->input->isReadable();
108+
}
109+
110+
public function pipe(WritableStreamInterface $dest, array $options = array())
111+
{
112+
return Util::pipe($this, $dest, $options);
113+
}
114+
115+
public function close()
116+
{
117+
if ($this->closed) {
118+
return;
119+
}
120+
121+
$this->closed = true;
122+
123+
// closing output stream closes input stream
124+
$this->input->close();
125+
126+
$this->emit('close');
127+
$this->removeAllListeners();
128+
}
129+
}

0 commit comments

Comments
 (0)