Skip to content

Commit 792faff

Browse files
committed
Update dependencies for more reliable streaming APIs
Update to clue/buzz-react:0.5 to support PSR-7 messages and take advantange of its new streaming API. Use clue/promise-stream-react for more reliable promise unwrapping, better error reporting and advanced back pressure support
1 parent 5c197f8 commit 792faff

8 files changed

Lines changed: 33 additions & 58 deletions

File tree

composer.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
"require": {
1717
"php": ">=5.3",
1818
"react/event-loop": "~0.3.0|~0.4.0",
19-
"clue/buzz-react": "~0.4.0",
19+
"clue/buzz-react": "^0.5",
2020
"react/promise": "~2.0|~1.1",
2121
"clue/json-stream": "~0.1.0",
22-
"rize/uri-template": "^0.3"
22+
"rize/uri-template": "^0.3",
23+
"clue/promise-stream-react": "^0.1"
2324
},
2425
"require-dev": {
2526
"clue/tar-react": "~0.1.0",

examples/export.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
});
2626

2727
$out = new Stream(fopen($target, 'w'), $loop);
28+
$out->pause();
2829
$stream->pipe($out);
2930

3031
$loop->run();

src/Client.php

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace Clue\React\Docker;
44

55
use Clue\React\Buzz\Browser;
6-
use Clue\React\Buzz\Message\Response;
76
use Clue\React\Docker\Io\ResponseParser;
87
use React\Promise\PromiseInterface as Promise;
98
use Clue\React\Docker\Io\StreamingParser;
@@ -254,7 +253,7 @@ public function containerExport($container)
254253
public function containerExportStream($container)
255254
{
256255
return $this->streamingParser->parsePlainStream(
257-
$this->browser->get(
256+
$this->browser->withOptions(array('streaming' => true))->get(
258257
$this->uri->expand(
259258
'/containers/{container}/export',
260259
array(
@@ -512,14 +511,17 @@ public function containerCopy($container, $config)
512511
public function containerCopyStream($container, $config)
513512
{
514513
return $this->streamingParser->parsePlainStream(
515-
$this->postJson(
514+
$this->browser->withOptions(array('streaming' => true))->post(
516515
$this->uri->expand(
517516
'/containers/{container}/copy',
518517
array(
519518
'container' => $container
520519
)
521520
),
522-
$config
521+
array(
522+
'Content-Type' => 'application/json'
523+
),
524+
$this->json($config)
523525
)
524526
);
525527
}
@@ -605,7 +607,7 @@ public function imageCreate($fromImage = null, $fromSrc = null, $repo = null, $t
605607
public function imageCreateStream($fromImage = null, $fromSrc = null, $repo = null, $tag = null, $registry = null, $registryAuth = null)
606608
{
607609
return $this->streamingParser->parseJsonStream(
608-
$this->browser->post(
610+
$this->browser->withOptions(array('streaming' => true))->post(
609611
$this->uri->expand(
610612
'/images/create{?fromImage,fromSrc,repo,tag,registry}',
611613
array(
@@ -715,7 +717,7 @@ public function imagePush($image, $tag = null, $registry = null, $registryAuth =
715717
public function imagePushStream($image, $tag = null, $registry = null, $registryAuth = null)
716718
{
717719
return $this->streamingParser->parseJsonStream(
718-
$this->browser->post(
720+
$this->browser->withOptions(array('streaming' => true))->post(
719721
$this->uri->expand(
720722
'/images{/registry}/{image}/push{?tag}',
721723
array(

src/Io/ResponseParser.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,25 @@
22

33
namespace Clue\React\Docker\Io;
44

5-
use Clue\React\Buzz\Message\Response;
5+
use Psr\Http\Message\ResponseInterface;
66

77
class ResponseParser
88
{
9-
public function expectPlain(Response $response)
9+
public function expectPlain(ResponseInterface $response)
1010
{
1111
// text/plain
1212

1313
return (string)$response->getBody();
1414
}
1515

16-
public function expectJson(Response $response)
16+
public function expectJson(ResponseInterface $response)
1717
{
1818
// application/json
1919

2020
return json_decode((string)$response->getBody(), true);
2121
}
2222

23-
public function expectEmpty(Response $response)
23+
public function expectEmpty(ResponseInterface $response)
2424
{
2525
// 204 No Content
2626
// no content-type

src/Io/StreamingParser.php

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
use React\Stream\ReadableStreamInterface;
1010
use RuntimeException;
1111
use React\Promise\CancellablePromiseInterface;
12+
use Clue\React\Promise\Stream;
13+
use Psr\Http\Message\ResponseInterface;
1214

1315
class StreamingParser
1416
{
@@ -58,40 +60,9 @@ public function parsePlainStream(PromiseInterface $promise)
5860
{
5961
// text/plain
6062

61-
$out = new ReadableStream();
62-
63-
// try to cancel promise once the stream closes
64-
if ($promise instanceof CancellablePromiseInterface) {
65-
$out->on('close', function() use ($promise) {
66-
$promise->cancel();
67-
});
68-
}
69-
70-
$promise->then(
71-
function ($response) use ($out) {
72-
$out->close();
73-
},
74-
function ($error) use ($out) {
75-
$out->emit('error', array($error, $out));
76-
$out->close();
77-
},
78-
function ($progress) use ($out) {
79-
if (is_array($progress) && isset($progress['responseStream'])) {
80-
$stream = $progress['responseStream'];
81-
/* @var $stream React\Stream\Stream */
82-
83-
// hack to do not buffer stream contents in body
84-
$stream->removeAllListeners('data');
85-
86-
// got a streaming HTTP response => forward each data chunk to the resulting output stream
87-
$stream->on('data', function ($data) use ($out) {
88-
$out->emit('data', array($data, $out));
89-
});
90-
}
91-
}
92-
);
93-
94-
return $out;
63+
return Stream\unwrapReadable($promise->then(function (ResponseInterface $response) {
64+
return $response->getBody();
65+
}));
9566
}
9667

9768
public function deferredStream(ReadableStreamInterface $stream, $progressEventName)

tests/ClientTest.php

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
<?php
22

3-
use Clue\React\Buzz\Message\Response;
4-
use Clue\React\Buzz\Message\Body;
53
use Clue\React\Docker\Client;
64
use React\Promise\Deferred;
75
use Clue\React\Buzz\Browser;
6+
use RingCentral\Psr7\Response;
7+
use Psr\Http\Message\ResponseInterface;
8+
use Psr\Http\Message\RequestInterface;
89

910
class ClientTest extends TestCase
1011
{
@@ -361,7 +362,7 @@ public function testExecResize()
361362
$this->expectPromiseResolveWith('', $this->client->execResize(123, 800, 600));
362363
}
363364

364-
private function expectRequestFlow($method, $url, Response $response, $parser)
365+
private function expectRequestFlow($method, $url, ResponseInterface $response, $parser)
365366
{
366367
$return = (string)$response->getBody();
367368
if ($parser === 'expectJson') {
@@ -372,10 +373,10 @@ private function expectRequestFlow($method, $url, Response $response, $parser)
372373
$this->parser->expects($this->once())->method($parser)->with($this->equalTo($response))->will($this->returnValue($return));
373374
}
374375

375-
private function expectRequest($method, $url, Response $response)
376+
private function expectRequest($method, $url, ResponseInterface $response)
376377
{
377378
$that = $this;
378-
$this->sender->expects($this->once())->method('send')->with($this->callback(function ($request) use ($that, $method, $url) {
379+
$this->sender->expects($this->once())->method('send')->with($this->callback(function (RequestInterface $request) use ($that, $method, $url) {
379380
$that->assertEquals(strtoupper($method), $request->getMethod());
380381
$that->assertEquals('http://x' . $url, (string)$request->getUri());
381382

@@ -385,7 +386,7 @@ private function expectRequest($method, $url, Response $response)
385386

386387
private function createResponse($body = '')
387388
{
388-
return new Response('HTTP/1.0', 200, 'OK', array(), new Body($body));
389+
return new Response(200, array(), $body);
389390
}
390391

391392
private function createResponseJson($json)

tests/Io/ResponseParserTest.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
<?php
22

33
use Clue\React\Docker\Io\ResponseParser;
4-
use Clue\React\Buzz\Message\Response;
5-
use Clue\React\Buzz\Message\Body;
4+
use RingCentral\Psr7\Response;
65

76
class ResponseParserTest extends TestCase
87
{
@@ -32,7 +31,7 @@ public function testEmpty()
3231

3332
private function createResponse($body = '')
3433
{
35-
return new Response('HTTP/1.0', 200, 'OK', array(), new Body($body));
34+
return new Response(200, array(), $body);
3635
}
3736

3837
}

tests/Io/StreamingParserTest.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,18 @@ public function testJsonRejectingPromiseWillEmitErrorAndCloseEvent()
4141
$this->assertFalse($stream->isReadable());
4242
}
4343

44-
public function testJsonResolvingPromiseWillEmitCloseEvent()
44+
public function testJsonResolvingPromiseWithWrongValueWillEmitErrorAndCloseEvent()
4545
{
4646
$deferred = new Deferred();
4747

4848
$stream = $this->parser->parseJsonStream($deferred->promise());
4949

5050
$this->assertTrue($stream->isReadable());
5151

52-
$stream->on('error', $this->expectCallableNever());
52+
$stream->on('error', $this->expectCallableOnce());
5353
$stream->on('close', $this->expectCallableOnce());
5454

55-
$deferred->resolve('data');
55+
$deferred->resolve('not a stream');
5656

5757
$this->assertFalse($stream->isReadable());
5858
}

0 commit comments

Comments
 (0)