Skip to content

Commit 38924cf

Browse files
committed
Use wrapper for demultiplexing (supports back pressure and is faster)
1 parent 9e87c39 commit 38924cf

7 files changed

Lines changed: 250 additions & 225 deletions

src/Io/MultiplexStreamParser.php

Lines changed: 0 additions & 72 deletions
This file was deleted.
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
<?php
2+
3+
namespace Clue\React\Docker\Io;
4+
5+
use React\Stream\ReadableStreamInterface;
6+
use Evenement\EventEmitter;
7+
use React\Stream\WritableStreamInterface;
8+
use React\Stream\Util;
9+
/**
10+
* Parser for Docker's own frame format used for bidrectional frames
11+
*
12+
* Each frame consists of a simple header containing the stream identifier and the payload length
13+
* plus the actual payload string.
14+
*
15+
* @internal
16+
* @link https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
17+
*/
18+
class ReadableDemultiplexStream extends EventEmitter implements ReadableStreamInterface
19+
{
20+
private $buffer = '';
21+
private $closed = false;
22+
private $multiplexed;
23+
private $stderrEvent;
24+
25+
public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent = null)
26+
{
27+
$this->multiplexed = $multiplexed;
28+
29+
if ($stderrEvent === null) {
30+
$stderrEvent = 'data';
31+
}
32+
33+
$this->stderrEvent = $stderrEvent;
34+
35+
$out = $this;
36+
$buffer =& $this->buffer;
37+
$closed =& $this->closed;
38+
39+
// pass all input data chunks through the parser
40+
$multiplexed->on('data', array($out, 'push'));
41+
42+
// forward end event to output (unless parsing is still in progress)
43+
$multiplexed->on('end', function () use (&$buffer, $out, &$closed) {
44+
// ignore duplicate end events
45+
if ($closed) {
46+
return;
47+
}
48+
49+
// buffer must be empty on end, otherwise this is an error situation
50+
if ($buffer === '') {
51+
$out->emit('end', array());
52+
} else {
53+
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk')));
54+
}
55+
$out->close();
56+
});
57+
58+
// forward error event to output
59+
$multiplexed->on('error', function ($error) use ($out) {
60+
$out->emit('error', array($error));
61+
$out->close();
62+
});
63+
64+
// forward close event to output
65+
$multiplexed->on('close', function ($error) use ($out) {
66+
$out->close();
67+
});
68+
}
69+
70+
/**
71+
* push the given stream chunk into the parser buffer and try to extract all frames
72+
*
73+
* @internal
74+
* @param string $chunk
75+
*/
76+
public function push($chunk)
77+
{
78+
$this->buffer .= $chunk;
79+
80+
while ($this->buffer !== '') {
81+
if (!isset($this->buffer[7])) {
82+
// last header byte not set => no complete header in buffer
83+
break;
84+
}
85+
86+
$header = unpack('Cstream/x/x/x/Nlength', substr($this->buffer, 0, 8));
87+
88+
if (!isset($this->buffer[7 + $header['length']])) {
89+
// last payload byte not set => message payload is incomplete
90+
break;
91+
}
92+
93+
$payload = substr($this->buffer, 8, $header['length']);
94+
$this->buffer = (string)substr($this->buffer, 8 + $header['length']);
95+
96+
$this->emit(
97+
($header['stream'] === 2) ? $this->stderrEvent : 'data',
98+
array($payload)
99+
);
100+
}
101+
}
102+
103+
public function pause()
104+
{
105+
$this->multiplexed->pause();
106+
}
107+
108+
public function resume()
109+
{
110+
$this->multiplexed->resume();
111+
}
112+
113+
public function isReadable()
114+
{
115+
return $this->multiplexed->isReadable();
116+
}
117+
118+
public function pipe(WritableStreamInterface $dest, array $options = array())
119+
{
120+
return Util::pipe($this, $dest, $options);
121+
}
122+
123+
public function close()
124+
{
125+
if ($this->closed) {
126+
return;
127+
}
128+
129+
$this->closed = true;
130+
131+
// closing output stream closes input stream
132+
$this->multiplexed->close();
133+
134+
$this->emit('close', array());
135+
}
136+
}

src/Io/StreamingParser.php

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -93,52 +93,7 @@ public function parsePlainStream(PromiseInterface $promise)
9393
*/
9494
public function demultiplexStream(ReadableStreamInterface $input, $stderrEvent = null)
9595
{
96-
if ($stderrEvent === null) {
97-
$stderrEvent = 'data';
98-
}
99-
100-
$out = new ReadableStream();
101-
$parser = new MultiplexStreamParser();
102-
103-
// pass all input data chunks through the parser
104-
$input->on('data', function ($chunk) use ($parser, $out, $stderrEvent) {
105-
// once parser emits, forward to output stream
106-
$parser->push($chunk, function ($stream, $data) use ($out, $stderrEvent) {
107-
if ($stream === 2) {
108-
$out->emit($stderrEvent, array($data));
109-
} else {
110-
$out->emit('data', array($data));
111-
}
112-
});
113-
});
114-
115-
// forward end event to output (unless parsing is still in progress)
116-
$input->on('end', function () use ($out, $parser) {
117-
if ($parser->isEmpty()) {
118-
$out->emit('end', array());
119-
} else {
120-
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk')));
121-
}
122-
$out->close();
123-
});
124-
125-
// forward error event to output
126-
$input->on('error', function ($error) use ($out) {
127-
$out->emit('error', array($error));
128-
$out->close();
129-
});
130-
131-
// forward close event to output
132-
$input->on('close', function ($error) use ($out) {
133-
$out->close();
134-
});
135-
136-
// closing output stream closes input stream
137-
$out->on('close', function () use ($input) {
138-
$input->close();
139-
});
140-
141-
return $out;
96+
return new ReadableDemultiplexStream($input, $stderrEvent);
14297
}
14398

14499
/**

tests/FunctionalClientTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public function testExecStreamOutputInMultipleChunksWhileRunning($container)
185185

186186
$stream = $this->client->execStartStream($exec['Id']);
187187
$stream->once('data', $this->expectCallableOnceWith('hello'));
188-
//$stream->on('end', $this->expectCallableOnce());
188+
$stream->on('end', $this->expectCallableOnce());
189189

190190
$output = Block\await(Stream\buffer($stream), $this->loop);
191191

@@ -241,8 +241,8 @@ public function testExecStreamCommandWithTtyAndStderrOutputWhileRunning($contain
241241
$this->assertTrue(is_string($exec['Id']));
242242

243243
$stream = $this->client->execStartStream($exec['Id'], true);
244-
//$stream->on('data', $this->expectCallableOnce());
245-
//$stream->on('end', $this->expectCallableOnce());
244+
$stream->once('data', $this->expectCallableOnce('hello world'));
245+
$stream->on('end', $this->expectCallableOnce());
246246

247247
$output = Block\await(Stream\buffer($stream), $this->loop);
248248

@@ -265,7 +265,7 @@ public function testExecStreamStderrCustomEventWhileRunning($container)
265265
$stream->on('err', $this->expectCallableOnceWith('hello world'));
266266
$stream->on('data', $this->expectCallableNever());
267267
$stream->on('error', $this->expectCallableNever());
268-
//$stream->on('end', $this->expectCallableOnce());
268+
$stream->on('end', $this->expectCallableOnce());
269269

270270
$output = Block\await(Stream\buffer($stream), $this->loop);
271271

tests/Io/MultiplexStreamParserTest.php

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)