Skip to content

Commit 61cefef

Browse files
committed
Demultiplex execStart() and execStartStream() streaming APIs
1 parent 7339e16 commit 61cefef

7 files changed

Lines changed: 148 additions & 12 deletions

File tree

examples/exec-inspect.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
$factory = new Factory($loop);
2424
$client = $factory->createClient();
2525

26-
$client->execCreate($container, $cmd, true)->then(function ($info) use ($client) {
26+
$client->execCreate($container, $cmd)->then(function ($info) use ($client) {
2727
echo 'Created with info: ' . json_encode($info) . PHP_EOL;
2828

2929
return $client->execInspect($info['Id']);
3030
})->then(function ($info) use ($client) {
3131
echo 'Inspected after creation: ' . json_encode($info, JSON_PRETTY_PRINT) . PHP_EOL;
3232

33-
return $client->execStart($info['ID'], true)->then(function ($out) use ($client, $info) {
33+
return $client->execStart($info['ID'])->then(function ($out) use ($client, $info) {
3434
echo 'Starting returned: ';
3535
var_dump($out);
3636

examples/exec-stream.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
$out = new Stream(STDOUT, $loop);
2828
$out->pause();
2929

30-
$client->execCreate($container, $cmd, true)->then(function ($info) use ($client, $out) {
31-
$stream = $client->execStartStream($info['Id'], true);
30+
$client->execCreate($container, $cmd)->then(function ($info) use ($client, $out) {
31+
$stream = $client->execStartStream($info['Id']);
3232
$stream->pipe($out);
3333

3434
$stream->on('error', 'printf');

src/Client.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,7 @@ public function execStartDetached($exec, $tty = false)
10171017
*/
10181018
public function execStartStream($exec, $tty = false)
10191019
{
1020-
return $this->streamingParser->parsePlainStream(
1020+
$stream = $this->streamingParser->parsePlainStream(
10211021
$this->browser->withOptions(array('streaming' => true))->post(
10221022
$this->uri->expand(
10231023
'/exec/{exec}/start',
@@ -1033,6 +1033,13 @@ public function execStartStream($exec, $tty = false)
10331033
))
10341034
)
10351035
);
1036+
1037+
// this is a multiplexed stream unless this is started with a TTY
1038+
if (!$tty) {
1039+
$stream = $this->streamingParser->demultiplexStream($stream);
1040+
}
1041+
1042+
return $stream;
10361043
}
10371044

10381045
/**

src/Io/StreamingParser.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,54 @@ public function parsePlainStream(PromiseInterface $promise)
8484
}));
8585
}
8686

87+
/**
88+
* Returns a readable plain text stream for the given multiplexed stream using Docker's "attach multiplexing protocol"
89+
*
90+
* @param ReadableStreamInterface $input
91+
* @return ReadableStreamInterface
92+
*/
93+
public function demultiplexStream(ReadableStreamInterface $input)
94+
{
95+
$out = new ReadableStream();
96+
$parser = new MultiplexStreamParser();
97+
98+
// pass all input data chunks through the parser
99+
$input->on('data', function ($chunk) use ($parser, $out) {
100+
// once parser emits, forward to output stream
101+
$parser->push($chunk, function ($stream, $data) use ($out) {
102+
$out->emit('data', array($data));
103+
});
104+
});
105+
106+
// forward end event to output (unless parsing is still in progress)
107+
$input->on('end', function () use ($out, $parser) {
108+
if ($parser->isEmpty()) {
109+
$out->emit('end', array());
110+
} else {
111+
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk')));
112+
}
113+
$out->close();
114+
});
115+
116+
// forward error event to output
117+
$input->on('error', function ($error) use ($out) {
118+
$out->emit('error', array($error));
119+
$out->close();
120+
});
121+
122+
// forward close event to output
123+
$input->on('close', function ($error) use ($out) {
124+
$out->close();
125+
});
126+
127+
// closing output stream closes input stream
128+
$out->on('close', function () use ($input) {
129+
$input->close();
130+
});
131+
132+
return $out;
133+
}
134+
87135
/**
88136
* Returns a promise which resolves with the buffered stream contents of the given stream
89137
*

tests/ClientTest.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,18 +408,32 @@ public function testExecStart()
408408

409409
$this->expectRequest('POST', '/exec/123/start', $this->createResponse($data));
410410
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
411+
$this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream)->willReturn($stream);
411412
$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->equalTo($stream))->willReturn(Promise\resolve($data));
412413

413414
$this->expectPromiseResolveWith($data, $this->client->execStart(123, $config));
414415
}
415416

416-
public function testExecStartStream()
417+
public function testExecStartStreamWithoutTtyWillDemultiplex()
417418
{
418419
$config = array();
419420
$stream = $this->getMock('React\Stream\ReadableStreamInterface');
420421

421422
$this->expectRequest('POST', '/exec/123/start', $this->createResponse());
422423
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
424+
$this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream)->willReturn($stream);
425+
426+
$this->assertSame($stream, $this->client->execStartStream(123, $config));
427+
}
428+
429+
public function testExecStartStreamWithTtyWillNotDemultiplex()
430+
{
431+
$config = array('Tty' => true);
432+
$stream = $this->getMock('React\Stream\ReadableStreamInterface');
433+
434+
$this->expectRequest('POST', '/exec/123/start', $this->createResponse());
435+
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
436+
$this->streamingParser->expects($this->never())->method('demultiplexStream');
423437

424438
$this->assertSame($stream, $this->client->execStartStream(123, $config));
425439
}

tests/FunctionalClientTest.php

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public function testExecInspectBeforeRunning($exec)
133133
*/
134134
public function testExecStartWhileRunning($exec)
135135
{
136-
$promise = $this->client->execStart($exec, true);
136+
$promise = $this->client->execStart($exec);
137137
$output = Block\await($promise, $this->loop);
138138

139139
$this->assertEquals('hello world', $output);
@@ -165,7 +165,7 @@ public function testExecStringCommandWithOutputWhileRunning($container)
165165
$this->assertTrue(is_array($exec));
166166
$this->assertTrue(is_string($exec['Id']));
167167

168-
$promise = $this->client->execStart($exec['Id'], true);
168+
$promise = $this->client->execStart($exec['Id']);
169169
$output = Block\await($promise, $this->loop);
170170

171171
$this->assertEquals('hello world', $output);
@@ -177,13 +177,13 @@ public function testExecStringCommandWithOutputWhileRunning($container)
177177
*/
178178
public function testExecUserSpecificCommandWithOutputWhileRunning($container)
179179
{
180-
$promise = $this->client->execCreate($container, 'whoami', true, false, true, true, 'nobody');
180+
$promise = $this->client->execCreate($container, 'whoami', false, false, true, true, 'nobody');
181181
$exec = Block\await($promise, $this->loop);
182182

183183
$this->assertTrue(is_array($exec));
184184
$this->assertTrue(is_string($exec['Id']));
185185

186-
$promise = $this->client->execStart($exec['Id'], true);
186+
$promise = $this->client->execStart($exec['Id']);
187187
$output = Block\await($promise, $this->loop);
188188

189189
$this->assertEquals('nobody', rtrim($output));
@@ -195,18 +195,39 @@ public function testExecUserSpecificCommandWithOutputWhileRunning($container)
195195
*/
196196
public function testExecStringCommandWithStderrOutputWhileRunning($container)
197197
{
198-
$promise = $this->client->execCreate($container, 'echo -n hello world >&2', true);
198+
$promise = $this->client->execCreate($container, 'echo -n hello world >&2');
199199
$exec = Block\await($promise, $this->loop);
200200

201201
$this->assertTrue(is_array($exec));
202202
$this->assertTrue(is_string($exec['Id']));
203203

204-
$promise = $this->client->execStart($exec['Id'], true);
204+
$promise = $this->client->execStart($exec['Id']);
205205
$output = Block\await($promise, $this->loop);
206206

207207
$this->assertEquals('hello world', $output);
208208
}
209209

210+
/**
211+
* @depends testStartRunning
212+
* @param string $container
213+
*/
214+
public function testExecStreamCommandWithTtyAndStderrOutputWhileRunning($container)
215+
{
216+
$promise = $this->client->execCreate($container, 'echo -n hello world >&2', true);
217+
$exec = Block\await($promise, $this->loop);
218+
219+
$this->assertTrue(is_array($exec));
220+
$this->assertTrue(is_string($exec['Id']));
221+
222+
$stream = $this->client->execStartStream($exec['Id'], true);
223+
//$stream->on('data', $this->expectCallableOnce());
224+
//$stream->on('end', $this->expectCallableOnce());
225+
226+
$output = Block\await(Stream\buffer($stream), $this->loop);
227+
228+
$this->assertEquals('hello world', $output);
229+
}
230+
210231
/**
211232
* @depends testStartRunning
212233
* @param string $container

tests/Io/StreamingParserTest.php

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,50 @@ public function testDeferredCancelingPromiseWillCloseStream()
121121
$stream->expects($this->once())->method('close');
122122
$promise->cancel();
123123
}
124+
125+
public function testDemultiplexStreamWillForwardEndAndClose()
126+
{
127+
$stream = new ReadableStream();
128+
129+
$out = $this->parser->demultiplexStream($stream);
130+
131+
$out->on('data', $this->expectCallableNever());
132+
$out->on('close', $this->expectCallableOnce());
133+
//$out->on('end', $this->expectCallableOnce());
134+
135+
$stream->emit('end', array());
136+
137+
$this->assertFalse($out->isReadable());
138+
}
139+
140+
public function testDemultiplexStreamWillForwardErrorAndClose()
141+
{
142+
$stream = new ReadableStream();
143+
144+
$out = $this->parser->demultiplexStream($stream);
145+
146+
$out->on('error', $this->expectCallableOnce());
147+
$out->on('close', $this->expectCallableOnce());
148+
//$out->on('end', $this->expectCallableNever());
149+
150+
$stream->emit('error', array(new \RuntimeException('Test')));
151+
152+
$this->assertFalse($out->isReadable());
153+
}
154+
155+
public function testDemultiplexStreamWillEmitErrorWhenEndingWithinStream()
156+
{
157+
$stream = new ReadableStream();
158+
159+
$out = $this->parser->demultiplexStream($stream);
160+
161+
//$out->on('error', $this->expectCallableOnce());
162+
$out->on('close', $this->expectCallableOnce());
163+
//$out->on('end', $this->expectCallableNever());
164+
165+
$stream->emit('data', array('XX'));
166+
$stream->emit('end', array());
167+
168+
$this->assertFalse($out->isReadable());
169+
}
124170
}

0 commit comments

Comments
 (0)