Skip to content

Commit 7339e16

Browse files
committed
Add streaming multiplex parser
1 parent da95147 commit 7339e16

2 files changed

Lines changed: 136 additions & 0 deletions

File tree

src/Io/MultiplexStreamParser.php

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<?php
2+
3+
namespace Clue\React\Docker\Io;
4+
5+
/**
6+
* Parser for Docker's own frame format used for bidrectional frames
7+
*
8+
* Each frame consists of a simple header containing the stream identifier and the payload length
9+
* plus the actual payload string.
10+
*
11+
* @internal
12+
* @link https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
13+
*/
14+
class MultiplexStreamParser
15+
{
16+
private $buffer = '';
17+
18+
/**
19+
* push the given stream chunk into the parser buffer and try to extract all frames
20+
*
21+
* The given $callback parameter will be invoked for each individual frame
22+
* with the following signature: $callback($stream, $payload)
23+
*
24+
* @param string $chunk
25+
* @param callable $callback
26+
*/
27+
public function push($chunk, $callback)
28+
{
29+
$this->buffer .= $chunk;
30+
31+
while ($this->buffer !== '') {
32+
if (!isset($this->buffer[7])) {
33+
// last header byte not set => no complete header in buffer
34+
break;
35+
}
36+
37+
$header = unpack('Cstream/x/x/x/Nlength', substr($this->buffer, 0, 8));
38+
39+
if (!isset($this->buffer[7 + $header['length']])) {
40+
// last payload byte not set => message payload is incomplete
41+
break;
42+
}
43+
44+
$payload = substr($this->buffer, 8, $header['length']);
45+
$this->buffer = (string)substr($this->buffer, 8 + $header['length']);
46+
47+
$callback($header['stream'], $payload);
48+
}
49+
}
50+
51+
/**
52+
* checks whether the incoming frame buffer is empty
53+
*
54+
* @return boolean
55+
*/
56+
public function isEmpty()
57+
{
58+
return ($this->buffer === '');
59+
}
60+
61+
/**
62+
* creates a new outgoing frame
63+
*
64+
* @param int $stream
65+
* @param string $payload
66+
* @return string
67+
*/
68+
public function createFrame($stream, $payload)
69+
{
70+
return pack('CxxxN', $stream, strlen($payload)) . $payload;
71+
}
72+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?php
2+
3+
use Clue\React\Docker\Io\MultiplexStreamParser;
4+
5+
class MultiplexStreamParserTest extends TestCase
6+
{
7+
private $parser;
8+
9+
public function setUp()
10+
{
11+
$this->parser = new MultiplexStreamParser();
12+
}
13+
14+
public function testEmpty()
15+
{
16+
$this->assertTrue($this->parser->isEmpty());
17+
}
18+
19+
public function testCompleteFrame()
20+
{
21+
$frame = $this->parser->createFrame(1, 'test');
22+
23+
$this->assertEquals("\x01\x00\x00\x00" . "\x00\x00\x00\x04" . "test", $frame);
24+
}
25+
26+
public function testParseFrame()
27+
{
28+
$frame = $this->parser->createFrame(1, 'hello world');
29+
$this->parser->push($frame, $this->expectCallableOnceWith(1, 'hello world'));
30+
31+
$this->assertTrue($this->parser->isEmpty());
32+
}
33+
34+
public function testIncompleteHeader()
35+
{
36+
$this->parser->push("\x01\0\0\0", $this->expectCallableNever());
37+
$this->assertFalse($this->parser->isEmpty());
38+
39+
return $this->parser;
40+
}
41+
42+
/**
43+
* @depends testIncompleteHeader
44+
* @param MultiplexStreamParser $parser
45+
*/
46+
public function testIncompletePayload(MultiplexStreamParser $parser)
47+
{
48+
$parser->push("\0\0\0\x04te", $this->expectCallableNever());
49+
$this->assertFalse($parser->isEmpty());
50+
51+
return $parser;
52+
}
53+
54+
/**
55+
* @depends testIncompletePayload
56+
* @param MultiplexStreamParser $parser
57+
*/
58+
public function testChunkedFrame(MultiplexStreamParser $parser)
59+
{
60+
$parser->push('st', $this->expectCallableOnceWith(1, 'test'));
61+
62+
$this->assertTrue($this->parser->isEmpty());
63+
}
64+
}

0 commit comments

Comments
 (0)