Skip to content

Commit 30c1353

Browse files
committed
Change JSON stream to always report data events instead of progress
1 parent 61103e9 commit 30c1353

7 files changed

Lines changed: 38 additions & 52 deletions

File tree

README.md

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -330,18 +330,15 @@ $stream = $client->eventsStream();
330330

331331
The resulting stream will emit the following events:
332332

333-
* `progress`: for *each* element in the update stream
334-
* `error`: once if an error occurs, will close() stream then
333+
* `data`: for *each* element in the update stream
334+
* `error`: once if an error occurs, will close() stream then
335335
* Will emit an [`Io\JsonProgressException`](#jsonprogressexception) if an individual progress message contains an error message
336336
* Any other `Exception` in case of an transport error, like invalid request etc.
337-
* `close`: once the stream ends (either finished or after "error")
338-
339-
Please note that the resulting stream does not emit any "data" events, so
340-
you will not be able to pipe() its events into another `WritableStream`.
337+
* `close`: once the stream ends (either finished or after "error")
341338

342339
```php
343340
$stream = $client->imageCreateStream('clue/redis-benchmark');
344-
$stream->on('progress', function ($data) {
341+
$stream->on('data', function ($data) {
345342
// data will be emitted for *each* complete element in the JSON stream
346343
echo $data['status'] . PHP_EOL;
347344
});

examples/events.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
// stream all events for 10 seconds
2121
$stream = $client->eventsStream(null, microtime(true) + 10.0);
2222

23-
$stream->on('progress', function ($event) {
23+
$stream->on('data', function ($event) {
2424
echo json_encode($event) . PHP_EOL;
2525
});
2626

examples/pull.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
$stream = $client->imageCreateStream($image);
1717

18-
$stream->on('progress', function ($progress) {
18+
$stream->on('data', function ($progress) {
1919
echo 'progress: '. json_encode($progress) . PHP_EOL;
2020
});
2121

src/Client.php

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,7 @@ public function version()
127127
public function events($since = null, $until = null, $filters = array())
128128
{
129129
return $this->streamingParser->deferredStream(
130-
$this->eventsStream($since, $until, $filters),
131-
'progress'
130+
$this->eventsStream($since, $until, $filters)
132131
);
133132
}
134133

@@ -138,12 +137,9 @@ public function events($since = null, $until = null, $filters = array())
138137
* This is a JSON streaming API endpoint that returns a stream instance.
139138
*
140139
* The resulting stream will emit the following events:
141-
* - progress: for *each* element in the update stream
142-
* - error: once if an error occurs, will close() stream then
143-
* - close: once the stream ends (either finished or after "error")
144-
*
145-
* Please note that the resulting stream does not emit any "data" events, so
146-
* you will not be able to pipe() its events into another `WritableStream`.
140+
* - data: for *each* element in the update stream
141+
* - error: once if an error occurs, will close() stream then
142+
* - close: once the stream ends (either finished or after "error")
147143
*
148144
* The optional `$filters` parameter can be used to only get events for
149145
* certain event types, images and/or containers etc. like this:
@@ -758,9 +754,9 @@ public function imageList($all = false)
758754
*/
759755
public function imageCreate($fromImage = null, $fromSrc = null, $repo = null, $tag = null, $registry = null, $registryAuth = null)
760756
{
761-
$stream = $this->imageCreateStream($fromImage, $fromSrc, $repo, $tag, $registry, $registryAuth);
762-
763-
return $this->streamingParser->deferredStream($stream, 'progress');
757+
return $this->streamingParser->deferredStream(
758+
$this->imageCreateStream($fromImage, $fromSrc, $repo, $tag, $registry, $registryAuth)
759+
);
764760
}
765761

766762
/**
@@ -769,12 +765,9 @@ public function imageCreate($fromImage = null, $fromSrc = null, $repo = null, $t
769765
* This is a JSON streaming API endpoint that returns a stream instance.
770766
*
771767
* The resulting stream will emit the following events:
772-
* - progress: for *each* element in the update stream
773-
* - error: once if an error occurs, will close() stream then
774-
* - close: once the stream ends (either finished or after "error")
775-
*
776-
* Please note that the resulting stream does not emit any "data" events, so
777-
* you will not be able to pipe() its events into another `WritableStream`.
768+
* - data: for *each* element in the update stream
769+
* - error: once if an error occurs, will close() stream then
770+
* - close: once the stream ends (either finished or after "error").
778771
*
779772
* Pulling a private image from a remote registry will likely require authorization, so make
780773
* sure to pass the $registryAuth parameter, see `self::authHeaders()` for
@@ -871,9 +864,9 @@ public function imageHistory($image)
871864
*/
872865
public function imagePush($image, $tag = null, $registry = null, $registryAuth = null)
873866
{
874-
$stream = $this->imagePushStream($image, $tag, $registry, $registryAuth);
875-
876-
return $this->streamingParser->deferredStream($stream, 'progress');
867+
return $this->streamingParser->deferredStream(
868+
$this->imagePushStream($image, $tag, $registry, $registryAuth)
869+
);
877870
}
878871

879872
/**
@@ -882,12 +875,9 @@ public function imagePush($image, $tag = null, $registry = null, $registryAuth =
882875
* This is a JSON streaming API endpoint that returns a stream instance.
883876
*
884877
* The resulting stream will emit the following events:
885-
* - progress: for *each* element in the update stream
886-
* - error: once if an error occurs, will close() stream then
887-
* - close: once the stream ends (either finished or after "error")
888-
*
889-
* Please note that the resulting stream does not emit any "data" events, so
890-
* you will not be able to pipe() its events into another `WritableStream`.
878+
* - data: for *each* element in the update stream
879+
* - error: once if an error occurs, will close() stream then
880+
* - close: once the stream ends (either finished or after "error")
891881
*
892882
* Pushing to a remote registry will likely require authorization, so make
893883
* sure to pass the $registryAuth parameter, see `self::authHeaders()` for

src/Io/StreamingParser.php

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public function parseJsonStream(PromiseInterface $promise)
5252
$out->close();
5353
return;
5454
}
55-
$out->emit('progress', array($object, $out));
55+
$out->emit('data', array($object, $out));
5656
}
5757
});
5858

@@ -108,13 +108,12 @@ public function bufferedStream(ReadableStreamInterface $stream)
108108
}
109109

110110
/**
111-
* Returns a promise which resolves with an array of all "progress" events
111+
* Returns a promise which resolves with an array of all "data" events
112112
*
113113
* @param ReadableStreamInterface $stream
114-
* @param string $progressEventName the name of the event to collect
115114
* @return PromiseInterface Promise<array, Exception>
116115
*/
117-
public function deferredStream(ReadableStreamInterface $stream, $progressEventName)
116+
public function deferredStream(ReadableStreamInterface $stream)
118117
{
119118
// cancelling the deferred will (try to) close the stream
120119
$deferred = new Deferred(function () use ($stream) {
@@ -126,7 +125,7 @@ public function deferredStream(ReadableStreamInterface $stream, $progressEventNa
126125
if ($stream->isReadable()) {
127126
// buffer all data events for deferred resolving
128127
$buffered = array();
129-
$stream->on($progressEventName, function ($data) use (&$buffered) {
128+
$stream->on('data', function ($data) use (&$buffered) {
130129
$buffered []= $data;
131130
});
132131

tests/ClientTest.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public function testEvents()
106106

107107
$this->expectRequest('GET', '/events', $this->createResponseJsonStream($json));
108108
$this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream));
109-
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json));
109+
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json));
110110

111111
$this->expectPromiseResolveWith($json, $this->client->events());
112112
}
@@ -118,7 +118,7 @@ public function testEventsArgs()
118118

119119
$this->expectRequest('GET', '/events?since=10&until=20&filters=%7B%22image%22%3A%5B%22busybox%22%2C%22ubuntu%22%5D%7D', $this->createResponseJsonStream($json));
120120
$this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream));
121-
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json));
121+
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json));
122122

123123
$this->expectPromiseResolveWith($json, $this->client->events(10, 20, array('image' => array('busybox', 'ubuntu'))));
124124
}
@@ -357,7 +357,7 @@ public function testImageCreate()
357357

358358
$this->expectRequest('post', '/images/create?fromImage=busybox', $this->createResponseJsonStream($json));
359359
$this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream));
360-
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json));
360+
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json));
361361

362362
$this->expectPromiseResolveWith($json, $this->client->imageCreate('busybox'));
363363
}
@@ -395,7 +395,7 @@ public function testImagePush()
395395

396396
$this->expectRequest('post', '/images/123/push', $this->createResponseJsonStream($json));
397397
$this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream));
398-
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json));
398+
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json));
399399

400400
$this->expectPromiseResolveWith($json, $this->client->imagePush('123'));
401401
}
@@ -419,7 +419,7 @@ public function testImagePushCustomRegistry()
419419

420420
$this->expectRequest('post', '/images/demo.acme.com%3A5000/123/push?tag=test', $this->createResponseJsonStream($json));
421421
$this->streamingParser->expects($this->once())->method('parseJsonStream')->will($this->returnValue($stream));
422-
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream), $this->equalTo('progress'))->will($this->returnPromise($json));
422+
$this->streamingParser->expects($this->once())->method('deferredStream')->with($this->equalTo($stream))->will($this->returnPromise($json));
423423

424424
$this->expectPromiseResolveWith($json, $this->client->imagePush('123', 'test', 'demo.acme.com:5000', $auth));
425425
}

tests/Io/StreamingParserTest.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,19 +73,19 @@ public function testDeferredClosedStreamWillReject()
7373
$stream = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
7474
$stream->expects($this->once())->method('isReadable')->will($this->returnValue(false));
7575

76-
$promise = $this->parser->deferredStream($stream, 'anything');
76+
$promise = $this->parser->deferredStream($stream);
7777
$this->expectPromiseReject($promise);
7878
}
7979

8080
public function testDeferredStreamEventsWillBeEmittedAndBuffered()
8181
{
8282
$stream = new ReadableStream();
8383

84-
$promise = $this->parser->deferredStream($stream, 'demo');
84+
$promise = $this->parser->deferredStream($stream);
8585

8686
$stream->emit('ignored', array('ignored'));
87-
$stream->emit('demo', array('a'));
88-
$stream->emit('demo', array('b'));
87+
$stream->emit('data', array('a'));
88+
$stream->emit('data', array('b'));
8989

9090
$stream->close();
9191

@@ -96,11 +96,11 @@ public function testDeferredStreamErrorEventWillRejectPromise()
9696
{
9797
$stream = new ReadableStream();
9898

99-
$promise = $this->parser->deferredStream($stream, 'demo');
99+
$promise = $this->parser->deferredStream($stream);
100100

101101
$stream->emit('ignored', array('ignored'));
102102

103-
$stream->emit('demo', array('a'));
103+
$stream->emit('data', array('a'));
104104

105105
$stream->emit('error', array('value', 'ignord'));
106106

@@ -115,7 +115,7 @@ public function testDeferredCancelingPromiseWillCloseStream()
115115
$stream = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
116116
$stream->expects($this->once())->method('isReadable')->willReturn(true);
117117

118-
$promise = $this->parser->deferredStream($stream, 'anything');
118+
$promise = $this->parser->deferredStream($stream);
119119
if (!($promise instanceof CancellablePromiseInterface)) {
120120
$this->markTestSkipped('Requires Promise v2 API and has no effect on v1 API');
121121
}

0 commit comments

Comments
 (0)