Skip to content

Commit 9e484fd

Browse files
committed
Add PauseBufferStream decorator to buffer events for paused streams
1 parent c15a687 commit 9e484fd

2 files changed

Lines changed: 397 additions & 0 deletions

File tree

src/Io/PauseBufferStream.php

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
<?php
2+
3+
namespace React\Http\Io;
4+
5+
use Evenement\EventEmitter;
6+
use React\Stream\ReadableStreamInterface;
7+
use React\Stream\Util;
8+
use React\Stream\WritableStreamInterface;
9+
10+
/**
11+
* [Internal] Pauses a given stream and buffers all events while paused
12+
*
13+
* This class is used to buffer all events that happen on a given stream while
14+
* it is paused. This allows you to pause a stream and no longer watch for any
15+
* of its events. Once the stream is resumed, all buffered events will be
16+
* emitted. Explicitly closing the resulting stream clears all buffers.
17+
*
18+
* Note that this is an internal class only and nothing you should usually care
19+
* about.
20+
*
21+
* @see ReadableStreamInterface
22+
* @internal
23+
*/
24+
class PauseBufferStream extends EventEmitter implements ReadableStreamInterface
25+
{
26+
private $input;
27+
private $closed = false;
28+
private $paused = false;
29+
private $dataPaused = '';
30+
private $endPaused = false;
31+
private $closePaused = false;
32+
private $errorPaused = null;
33+
34+
public function __construct(ReadableStreamInterface $input)
35+
{
36+
$this->input = $input;
37+
38+
$this->input->on('data', array($this, 'handleData'));
39+
$this->input->on('end', array($this, 'handleEnd'));
40+
$this->input->on('error', array($this, 'handleError'));
41+
$this->input->on('close', array($this, 'handleClose'));
42+
}
43+
44+
public function isReadable()
45+
{
46+
return !$this->closed;
47+
}
48+
49+
public function pause()
50+
{
51+
if ($this->closed) {
52+
return;
53+
}
54+
55+
$this->input->pause();
56+
$this->paused = true;
57+
}
58+
59+
public function resume()
60+
{
61+
if ($this->closed) {
62+
return;
63+
}
64+
65+
$this->paused = false;
66+
67+
if ($this->dataPaused !== '') {
68+
$this->emit('data', array($this->dataPaused));
69+
$this->dataPaused = '';
70+
}
71+
72+
if ($this->errorPaused) {
73+
$this->emit('error', array($this->errorPaused));
74+
return $this->close();
75+
}
76+
77+
if ($this->endPaused) {
78+
$this->endPaused = false;
79+
$this->emit('end');
80+
return $this->close();
81+
}
82+
83+
if ($this->closePaused) {
84+
$this->closePaused = false;
85+
return $this->close();
86+
}
87+
88+
$this->input->resume();
89+
}
90+
91+
public function pipe(WritableStreamInterface $dest, array $options = array())
92+
{
93+
Util::pipe($this, $dest, $options);
94+
95+
return $dest;
96+
}
97+
98+
public function close()
99+
{
100+
if ($this->closed) {
101+
return;
102+
}
103+
104+
$this->closed = true;
105+
$this->dataPaused = '';
106+
$this->endPaused = $this->closePaused = false;
107+
$this->errorPaused = null;
108+
109+
$this->input->close();
110+
111+
$this->emit('close');
112+
$this->removeAllListeners();
113+
}
114+
115+
/** @internal */
116+
public function handleData($data)
117+
{
118+
if ($this->paused) {
119+
$this->dataPaused .= $data;
120+
return;
121+
}
122+
123+
$this->emit('data', array($data));
124+
}
125+
126+
/** @internal */
127+
public function handleError(\Exception $e)
128+
{
129+
if ($this->paused) {
130+
$this->errorPaused = $e;
131+
return;
132+
}
133+
134+
$this->emit('error', array($e));
135+
$this->close();
136+
}
137+
138+
/** @internal */
139+
public function handleEnd()
140+
{
141+
if ($this->paused) {
142+
$this->endPaused = true;
143+
return;
144+
}
145+
146+
if (!$this->closed) {
147+
$this->emit('end');
148+
$this->close();
149+
}
150+
}
151+
152+
/** @internal */
153+
public function handleClose()
154+
{
155+
if ($this->paused) {
156+
$this->closePaused = true;
157+
return;
158+
}
159+
160+
$this->close();
161+
}
162+
}

tests/Io/PauseBufferStreamTest.php

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
<?php
2+
3+
namespace React\Tests\Io;
4+
5+
use React\Tests\Http\TestCase;
6+
use React\Stream\ThroughStream;
7+
use React\Http\Io\PauseBufferStream;
8+
9+
class PauseBufferStreamTest extends TestCase
10+
{
11+
public function testPauseMethodWillBePassedThroughToInput()
12+
{
13+
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
14+
$input->expects($this->once())->method('pause');
15+
16+
$stream = new PauseBufferStream($input);
17+
$stream->pause();
18+
}
19+
20+
public function testCloseMethodWillBePassedThroughToInput()
21+
{
22+
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
23+
$input->expects($this->once())->method('close');
24+
25+
$stream = new PauseBufferStream($input);
26+
$stream->close();
27+
}
28+
29+
public function testPauseMethodWillNotBePassedThroughToInputAfterClose()
30+
{
31+
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
32+
$input->expects($this->never())->method('pause');
33+
34+
$stream = new PauseBufferStream($input);
35+
$stream->close();
36+
$stream->pause();
37+
}
38+
39+
public function testDataEventWillBePassedThroughAsIs()
40+
{
41+
$input = new ThroughStream();
42+
$stream = new PauseBufferStream($input);
43+
44+
$stream->on('data', $this->expectCallableOnceWith('hello'));
45+
$input->write('hello');
46+
}
47+
48+
public function testDataEventWillBePipedThroughAsIs()
49+
{
50+
$input = new ThroughStream();
51+
$stream = new PauseBufferStream($input);
52+
53+
$output = new ThroughStream($this->expectCallableOnceWith('hello'));
54+
$stream->pipe($output);
55+
56+
$input->write('hello');
57+
}
58+
59+
public function testPausedStreamWillNotPassThroughDataEvent()
60+
{
61+
$input = new ThroughStream();
62+
$stream = new PauseBufferStream($input);
63+
64+
$stream->pause();
65+
$stream->on('data', $this->expectCallableNever());
66+
$input->write('hello');
67+
}
68+
69+
public function testPauseStreamWillNotPipeThroughDataEvent()
70+
{
71+
$input = new ThroughStream();
72+
$stream = new PauseBufferStream($input);
73+
74+
$output = new ThroughStream($this->expectCallableNever());
75+
$stream->pipe($output);
76+
77+
$stream->pause();
78+
$input->write('hello');
79+
}
80+
81+
public function testPausedStreamWillPassThroughDataEventOnResume()
82+
{
83+
$input = new ThroughStream();
84+
$stream = new PauseBufferStream($input);
85+
86+
$stream->pause();
87+
$input->write('hello');
88+
89+
$stream->on('data', $this->expectCallableOnceWith('hello'));
90+
$stream->resume();
91+
}
92+
93+
public function testEndEventWillBePassedThroughAsIs()
94+
{
95+
$input = new ThroughStream();
96+
$stream = new PauseBufferStream($input);
97+
98+
$stream->on('data', $this->expectCallableOnceWith('hello'));
99+
$stream->on('end', $this->expectCallableOnce());
100+
$stream->on('close', $this->expectCallableOnce());
101+
$input->end('hello');
102+
103+
$this->assertFalse($stream->isReadable());
104+
}
105+
106+
public function testPausedStreamWillNotPassThroughEndEvent()
107+
{
108+
$input = new ThroughStream();
109+
$stream = new PauseBufferStream($input);
110+
111+
$stream->pause();
112+
$stream->on('data', $this->expectCallableNever());
113+
$stream->on('end', $this->expectCallableNever());
114+
$stream->on('close', $this->expectCallableNever());
115+
$input->end('hello');
116+
117+
$this->assertTrue($stream->isReadable());
118+
}
119+
120+
public function testPausedStreamWillPassThroughEndEventOnResume()
121+
{
122+
$input = new ThroughStream();
123+
$stream = new PauseBufferStream($input);
124+
125+
$stream->pause();
126+
$input->end('hello');
127+
128+
$stream->on('data', $this->expectCallableOnceWith('hello'));
129+
$stream->on('end', $this->expectCallableOnce());
130+
$stream->on('close', $this->expectCallableOnce());
131+
$stream->resume();
132+
133+
$this->assertFalse($stream->isReadable());
134+
}
135+
136+
public function testPausedStreamWillNotPassThroughEndEventOnExplicitClose()
137+
{
138+
$input = new ThroughStream();
139+
$stream = new PauseBufferStream($input);
140+
141+
$stream->pause();
142+
$stream->on('data', $this->expectCallableNever());
143+
$stream->on('end', $this->expectCallableNever());
144+
$stream->on('close', $this->expectCallableOnce());
145+
$input->end('hello');
146+
147+
$stream->close();
148+
149+
$this->assertFalse($stream->isReadable());
150+
}
151+
152+
public function testErrorEventWillBePassedThroughAsIs()
153+
{
154+
$input = new ThroughStream();
155+
$stream = new PauseBufferStream($input);
156+
157+
$stream->on('error', $this->expectCallableOnce());
158+
$stream->on('close', $this->expectCallableOnce());
159+
$input->emit('error', array(new \RuntimeException()));
160+
}
161+
162+
public function testPausedStreamWillNotPassThroughErrorEvent()
163+
{
164+
$input = new ThroughStream();
165+
$stream = new PauseBufferStream($input);
166+
167+
$stream->pause();
168+
$stream->on('error', $this->expectCallableNever());
169+
$stream->on('close', $this->expectCallableNever());
170+
$input->emit('error', array(new \RuntimeException()));
171+
}
172+
173+
public function testPausedStreamWillPassThroughErrorEventOnResume()
174+
{
175+
$input = new ThroughStream();
176+
$stream = new PauseBufferStream($input);
177+
178+
$stream->pause();
179+
$input->emit('error', array(new \RuntimeException()));
180+
181+
$stream->on('error', $this->expectCallableOnce());
182+
$stream->on('close', $this->expectCallableOnce());
183+
$stream->resume();
184+
}
185+
186+
public function testPausedStreamWillNotPassThroughErrorEventOnExplicitClose()
187+
{
188+
$input = new ThroughStream();
189+
$stream = new PauseBufferStream($input);
190+
191+
$stream->pause();
192+
$stream->on('error', $this->expectCallableNever());
193+
$stream->on('close', $this->expectCallableOnce());
194+
$input->emit('error', array(new \RuntimeException()));
195+
196+
$stream->close();
197+
}
198+
199+
public function testCloseEventWillBePassedThroughAsIs()
200+
{
201+
$input = new ThroughStream();
202+
$stream = new PauseBufferStream($input);
203+
204+
$stream->on('data', $this->expectCallableNever());
205+
$stream->on('end', $this->expectCallableNever());
206+
$stream->on('close', $this->expectCallableOnce());
207+
$input->close();
208+
}
209+
210+
public function testPausedStreamWillNotPassThroughCloseEvent()
211+
{
212+
$input = new ThroughStream();
213+
$stream = new PauseBufferStream($input);
214+
215+
$stream->pause();
216+
$stream->on('data', $this->expectCallableNever());
217+
$stream->on('end', $this->expectCallableNever());
218+
$stream->on('close', $this->expectCallableNever());
219+
$input->close();
220+
}
221+
222+
public function testPausedStreamWillPassThroughCloseEventOnResume()
223+
{
224+
$input = new ThroughStream();
225+
$stream = new PauseBufferStream($input);
226+
227+
$stream->pause();
228+
$input->close();
229+
230+
$stream->on('data', $this->expectCallableNever());
231+
$stream->on('end', $this->expectCallableNever());
232+
$stream->on('close', $this->expectCallableOnce());
233+
$stream->resume();
234+
}
235+
}

0 commit comments

Comments
 (0)