Skip to content

Commit 4923646

Browse files
committed
Merge pull request #40 from clue-labs/demultiplex
Demultiplex execStart() and execStartStream() streaming APIs
2 parents da95147 + 38924cf commit 4923646

10 files changed

Lines changed: 424 additions & 19 deletions

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,23 @@ $stream->on('close', function () {
221221
});
222222
```
223223

224+
Note that by default the output of both STDOUT and STDERR will be emitted
225+
as normal `data` events. You can optionally pass a custom event name which
226+
will be used to emit STDERR data so that it can be handled separately.
227+
Note that the normal streaming primitives likely do not know about this
228+
event, so special care may have to be taken.
229+
Also note that this option has no effect if you execute with a TTY.
230+
231+
```php
232+
$stream = $client->execStartStream($exec, $tty, 'stderr');
233+
$stream->on('data', function ($data) {
234+
echo 'STDOUT data: ' . $data;
235+
});
236+
$stream->on('stderr', function ($data) {
237+
echo 'STDERR data: ' . $data;
238+
});
239+
```
240+
224241
See also the [streaming exec example](examples/exec-stream.php) and the [exec benchmark example](examples/benchmark-exec.php).
225242

226243
The TTY mode should be set depending on whether your command needs a TTY

examples/exec-inspect.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
$factory = new Factory($loop);
2424
$client = $factory->createClient();
2525

26-
$client->execCreate($container, $cmd, true)->then(function ($info) use ($client) {
26+
$client->execCreate($container, $cmd)->then(function ($info) use ($client) {
2727
echo 'Created with info: ' . json_encode($info) . PHP_EOL;
2828

2929
return $client->execInspect($info['Id']);
3030
})->then(function ($info) use ($client) {
3131
echo 'Inspected after creation: ' . json_encode($info, JSON_PRETTY_PRINT) . PHP_EOL;
3232

33-
return $client->execStart($info['ID'], true)->then(function ($out) use ($client, $info) {
33+
return $client->execStart($info['ID'])->then(function ($out) use ($client, $info) {
3434
echo 'Starting returned: ';
3535
var_dump($out);
3636

examples/exec-stream.php

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,36 @@
2727
$out = new Stream(STDOUT, $loop);
2828
$out->pause();
2929

30-
$client->execCreate($container, $cmd, true)->then(function ($info) use ($client, $out) {
31-
$stream = $client->execStartStream($info['Id'], true);
30+
$stderr = new Stream(STDERR, $loop);
31+
$stderr->pause();
32+
33+
// unkown exit code by default
34+
$exit = 1;
35+
36+
$client->execCreate($container, $cmd)->then(function ($info) use ($client, $out, $stderr, &$exit) {
37+
$stream = $client->execStartStream($info['Id'], false, 'stderr');
3238
$stream->pipe($out);
3339

40+
// forward custom stderr event to STDERR stream
41+
$stream->on('stderr', function ($data) use ($stderr, $stream) {
42+
if ($stderr->write($data) === false) {
43+
$stream->pause();
44+
$stderr->once('drain', function () use ($stream) {
45+
$stream->resume();
46+
});
47+
}
48+
});
49+
3450
$stream->on('error', 'printf');
3551

36-
// exit with error code of executed command once it closes
37-
$stream->on('close', function () use ($client, $info) {
38-
$client->execInspect($info['Id'])->then(function ($info) {
39-
exit($info['ExitCode']);
52+
// remember exit code of executed command once it closes
53+
$stream->on('close', function () use ($client, $info, &$exit) {
54+
$client->execInspect($info['Id'])->then(function ($info) use (&$exit) {
55+
$exit = $info['ExitCode'];
4056
}, 'printf');
4157
});
4258
}, 'printf');
4359

4460
$loop->run();
61+
62+
exit($exit);

src/Client.php

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,16 +1008,24 @@ public function execStartDetached($exec, $tty = false)
10081008
* This works for command output of any size as only small chunks have to
10091009
* be kept in memory.
10101010
*
1011-
* @param string $exec exec ID
1012-
* @param boolean $tty tty mode
1011+
* Note that by default the output of both STDOUT and STDERR will be emitted
1012+
* as normal "data" events. You can optionally pass a custom event name which
1013+
* will be used to emit STDERR data so that it can be handled separately.
1014+
* Note that the normal streaming primitives likely do not know about this
1015+
* event, so special care may have to be taken.
1016+
* Also note that this option has no effect if you execute with a TTY.
1017+
*
1018+
* @param string $exec exec ID
1019+
* @param boolean $tty tty mode
1020+
* @param string $stderrEvent custom event to emit for STDERR data (otherwise emits as "data")
10131021
* @return ReadableStreamInterface stream of exec data
10141022
* @link https://docs.docker.com/reference/api/docker_remote_api_v1.15/#exec-start
10151023
* @see self::execStart()
10161024
* @see self::execStartDetached()
10171025
*/
1018-
public function execStartStream($exec, $tty = false)
1026+
public function execStartStream($exec, $tty = false, $stderrEvent = null)
10191027
{
1020-
return $this->streamingParser->parsePlainStream(
1028+
$stream = $this->streamingParser->parsePlainStream(
10211029
$this->browser->withOptions(array('streaming' => true))->post(
10221030
$this->uri->expand(
10231031
'/exec/{exec}/start',
@@ -1033,6 +1041,13 @@ public function execStartStream($exec, $tty = false)
10331041
))
10341042
)
10351043
);
1044+
1045+
// this is a multiplexed stream unless this is started with a TTY
1046+
if (!$tty) {
1047+
$stream = $this->streamingParser->demultiplexStream($stream, $stderrEvent);
1048+
}
1049+
1050+
return $stream;
10361051
}
10371052

10381053
/**
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
<?php
2+
3+
namespace Clue\React\Docker\Io;
4+
5+
use React\Stream\ReadableStreamInterface;
6+
use Evenement\EventEmitter;
7+
use React\Stream\WritableStreamInterface;
8+
use React\Stream\Util;
9+
/**
10+
* Parser for Docker's own frame format used for bidrectional frames
11+
*
12+
* Each frame consists of a simple header containing the stream identifier and the payload length
13+
* plus the actual payload string.
14+
*
15+
* @internal
16+
* @link https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
17+
*/
18+
class ReadableDemultiplexStream extends EventEmitter implements ReadableStreamInterface
19+
{
20+
private $buffer = '';
21+
private $closed = false;
22+
private $multiplexed;
23+
private $stderrEvent;
24+
25+
public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent = null)
26+
{
27+
$this->multiplexed = $multiplexed;
28+
29+
if ($stderrEvent === null) {
30+
$stderrEvent = 'data';
31+
}
32+
33+
$this->stderrEvent = $stderrEvent;
34+
35+
$out = $this;
36+
$buffer =& $this->buffer;
37+
$closed =& $this->closed;
38+
39+
// pass all input data chunks through the parser
40+
$multiplexed->on('data', array($out, 'push'));
41+
42+
// forward end event to output (unless parsing is still in progress)
43+
$multiplexed->on('end', function () use (&$buffer, $out, &$closed) {
44+
// ignore duplicate end events
45+
if ($closed) {
46+
return;
47+
}
48+
49+
// buffer must be empty on end, otherwise this is an error situation
50+
if ($buffer === '') {
51+
$out->emit('end', array());
52+
} else {
53+
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk')));
54+
}
55+
$out->close();
56+
});
57+
58+
// forward error event to output
59+
$multiplexed->on('error', function ($error) use ($out) {
60+
$out->emit('error', array($error));
61+
$out->close();
62+
});
63+
64+
// forward close event to output
65+
$multiplexed->on('close', function ($error) use ($out) {
66+
$out->close();
67+
});
68+
}
69+
70+
/**
71+
* push the given stream chunk into the parser buffer and try to extract all frames
72+
*
73+
* @internal
74+
* @param string $chunk
75+
*/
76+
public function push($chunk)
77+
{
78+
$this->buffer .= $chunk;
79+
80+
while ($this->buffer !== '') {
81+
if (!isset($this->buffer[7])) {
82+
// last header byte not set => no complete header in buffer
83+
break;
84+
}
85+
86+
$header = unpack('Cstream/x/x/x/Nlength', substr($this->buffer, 0, 8));
87+
88+
if (!isset($this->buffer[7 + $header['length']])) {
89+
// last payload byte not set => message payload is incomplete
90+
break;
91+
}
92+
93+
$payload = substr($this->buffer, 8, $header['length']);
94+
$this->buffer = (string)substr($this->buffer, 8 + $header['length']);
95+
96+
$this->emit(
97+
($header['stream'] === 2) ? $this->stderrEvent : 'data',
98+
array($payload)
99+
);
100+
}
101+
}
102+
103+
public function pause()
104+
{
105+
$this->multiplexed->pause();
106+
}
107+
108+
public function resume()
109+
{
110+
$this->multiplexed->resume();
111+
}
112+
113+
public function isReadable()
114+
{
115+
return $this->multiplexed->isReadable();
116+
}
117+
118+
public function pipe(WritableStreamInterface $dest, array $options = array())
119+
{
120+
return Util::pipe($this, $dest, $options);
121+
}
122+
123+
public function close()
124+
{
125+
if ($this->closed) {
126+
return;
127+
}
128+
129+
$this->closed = true;
130+
131+
// closing output stream closes input stream
132+
$this->multiplexed->close();
133+
134+
$this->emit('close', array());
135+
}
136+
}

src/Io/StreamingParser.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,18 @@ public function parsePlainStream(PromiseInterface $promise)
8484
}));
8585
}
8686

87+
/**
88+
* Returns a readable plain text stream for the given multiplexed stream using Docker's "attach multiplexing protocol"
89+
*
90+
* @param ReadableStreamInterface $input
91+
* @param string $stderrEvent
92+
* @return ReadableStreamInterface
93+
*/
94+
public function demultiplexStream(ReadableStreamInterface $input, $stderrEvent = null)
95+
{
96+
return new ReadableDemultiplexStream($input, $stderrEvent);
97+
}
98+
8799
/**
88100
* Returns a promise which resolves with the buffered stream contents of the given stream
89101
*

tests/ClientTest.php

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,22 +408,48 @@ public function testExecStart()
408408

409409
$this->expectRequest('POST', '/exec/123/start', $this->createResponse($data));
410410
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
411+
$this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream)->willReturn($stream);
411412
$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->equalTo($stream))->willReturn(Promise\resolve($data));
412413

413414
$this->expectPromiseResolveWith($data, $this->client->execStart(123, $config));
414415
}
415416

416-
public function testExecStartStream()
417+
public function testExecStartStreamWithoutTtyWillDemultiplex()
417418
{
418419
$config = array();
419420
$stream = $this->getMock('React\Stream\ReadableStreamInterface');
420421

421422
$this->expectRequest('POST', '/exec/123/start', $this->createResponse());
422423
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
424+
$this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream)->willReturn($stream);
423425

424426
$this->assertSame($stream, $this->client->execStartStream(123, $config));
425427
}
426428

429+
public function testExecStartStreamWithTtyWillNotDemultiplex()
430+
{
431+
$config = array('Tty' => true);
432+
$stream = $this->getMock('React\Stream\ReadableStreamInterface');
433+
434+
$this->expectRequest('POST', '/exec/123/start', $this->createResponse());
435+
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
436+
$this->streamingParser->expects($this->never())->method('demultiplexStream');
437+
438+
$this->assertSame($stream, $this->client->execStartStream(123, $config));
439+
}
440+
441+
public function testExecStartStreamWithCustomStderrEvent()
442+
{
443+
$config = array();
444+
$stream = $this->getMock('React\Stream\ReadableStreamInterface');
445+
446+
$this->expectRequest('POST', '/exec/123/start', $this->createResponse());
447+
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
448+
$this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream, 'stderr')->willReturn($stream);
449+
450+
$this->assertSame($stream, $this->client->execStartStream(123, $config, 'stderr'));
451+
}
452+
427453
public function testExecResize()
428454
{
429455
$this->expectRequestFlow('POST', '/exec/123/resize?w=800&h=600', $this->createResponse(), 'expectEmpty');

0 commit comments

Comments
 (0)