Skip to content

Commit 955bf73

Browse files
committed
Do not resume() stream if handler explicitly calls pause()
1 parent 136f78b commit 955bf73

3 files changed

Lines changed: 125 additions & 13 deletions

File tree

src/Io/PauseBufferStream.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class PauseBufferStream extends EventEmitter implements ReadableStreamInterface
3030
private $endPaused = false;
3131
private $closePaused = false;
3232
private $errorPaused = null;
33+
private $implicit = false;
3334

3435
public function __construct(ReadableStreamInterface $input)
3536
{
@@ -41,6 +42,29 @@ public function __construct(ReadableStreamInterface $input)
4142
$this->input->on('close', array($this, 'handleClose'));
4243
}
4344

45+
/**
46+
* pause and remember this was not explicitly from user control
47+
*
48+
* @internal
49+
*/
50+
public function pauseImplicit()
51+
{
52+
$this->pause();
53+
$this->implicit = true;
54+
}
55+
56+
/**
57+
* resume only if this was previously paused implicitly and not explicitly from user control
58+
*
59+
* @internal
60+
*/
61+
public function resumeImplicit()
62+
{
63+
if ($this->implicit) {
64+
$this->resume();
65+
}
66+
}
67+
4468
public function isReadable()
4569
{
4670
return !$this->closed;
@@ -54,6 +78,7 @@ public function pause()
5478

5579
$this->input->pause();
5680
$this->paused = true;
81+
$this->implicit = false;
5782
}
5883

5984
public function resume()
@@ -63,6 +88,7 @@ public function resume()
6388
}
6489

6590
$this->paused = false;
91+
$this->implicit = false;
6692

6793
if ($this->dataPaused !== '') {
6894
$this->emit('data', array($this->dataPaused));

src/Middleware/LimitHandlersMiddleware.php

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,32 +84,33 @@ public function __invoke(ServerRequestInterface $request, $next)
8484
{
8585
$body = $request->getBody();
8686
if ($body instanceof ReadableStreamInterface) {
87-
// replace with buffering body to ensure any readable events will be buffered
88-
$body = new HttpBodyStream(
89-
new PauseBufferStream($body),
90-
$body->getSize()
91-
);
92-
9387
// pause actual body to stop emitting data until the handler is called
94-
$request = $request->withBody($body);
95-
$body->pause();
88+
$size = $body->getSize();
89+
$body = new PauseBufferStream($body);
90+
$body->pauseImplicit();
91+
92+
// replace with buffering body to ensure any readable events will be buffered
93+
$request = $request->withBody(new HttpBodyStream(
94+
$body,
95+
$size
96+
));
9697
}
98+
9799
$deferred = new Deferred();
98100
$this->queued->enqueue($deferred);
99101

100102
$this->processQueue();
101103

102104
$that = $this;
103105
$pending = &$this->pending;
104-
return $deferred->promise()->then(function () use ($request, $next, &$pending) {
106+
return $deferred->promise()->then(function () use ($request, $next, $body, &$pending) {
105107
$pending++;
106108

107109
$ret = $next($request);
108110

109111
// resume readable stream and replay buffered events
110-
$body = $request->getBody();
111-
if ($body instanceof ReadableStreamInterface) {
112-
$body->resume();
112+
if ($body instanceof PauseBufferStream) {
113+
$body->resumeImplicit();
113114
}
114115

115116
return $ret;

tests/Middleware/LimitHandlersMiddlewareTest.php

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,6 @@ public function testReceivesNextStreamingBodyWithBufferedDataAfterPreviousHandle
255255
new HttpBodyStream($stream, 10)
256256
);
257257

258-
$req = null;
259258
$once = $this->expectCallableOnceWith('helloworld');
260259
$middleware($request, function (ServerRequestInterface $request) use ($once) {
261260
$request->getBody()->on('data', $once);
@@ -266,4 +265,90 @@ public function testReceivesNextStreamingBodyWithBufferedDataAfterPreviousHandle
266265

267266
$deferred->reject(new \RuntimeException());
268267
}
268+
269+
public function testReceivesNextStreamingBodyAndDoesNotEmitDataIfExplicitlyClosed()
270+
{
271+
$deferred = new Deferred();
272+
$middleware = new LimitHandlersMiddleware(1);
273+
$middleware(new ServerRequest('GET', 'http://example.com/'), function () use ($deferred) {
274+
return $deferred->promise();
275+
});
276+
277+
$stream = new ThroughStream();
278+
$request = new ServerRequest(
279+
'POST',
280+
'http://example.com/',
281+
array(),
282+
new HttpBodyStream($stream, 10)
283+
);
284+
285+
$never = $this->expectCallableNever();
286+
$middleware($request, function (ServerRequestInterface $request) use ($never) {
287+
$request->getBody()->close();
288+
$request->getBody()->on('data', $never);
289+
});
290+
291+
$stream->write('hello');
292+
$stream->write('world');
293+
294+
$deferred->reject(new \RuntimeException());
295+
}
296+
297+
public function testReceivesNextStreamingBodyAndDoesNotEmitDataIfExplicitlyPaused()
298+
{
299+
$deferred = new Deferred();
300+
$middleware = new LimitHandlersMiddleware(1);
301+
$middleware(new ServerRequest('GET', 'http://example.com/'), function () use ($deferred) {
302+
return $deferred->promise();
303+
});
304+
305+
$stream = new ThroughStream();
306+
$request = new ServerRequest(
307+
'POST',
308+
'http://example.com/',
309+
array(),
310+
new HttpBodyStream($stream, 10)
311+
);
312+
313+
$never = $this->expectCallableNever();
314+
$middleware($request, function (ServerRequestInterface $request) use ($never) {
315+
$request->getBody()->pause();
316+
$request->getBody()->on('data', $never);
317+
});
318+
319+
$stream->write('hello');
320+
$stream->write('world');
321+
322+
$deferred->reject(new \RuntimeException());
323+
}
324+
325+
public function testReceivesNextStreamingBodyAndDoesEmitDataImmediatelyIfExplicitlyResumed()
326+
{
327+
$deferred = new Deferred();
328+
$middleware = new LimitHandlersMiddleware(1);
329+
$middleware(new ServerRequest('GET', 'http://example.com/'), function () use ($deferred) {
330+
return $deferred->promise();
331+
});
332+
333+
$stream = new ThroughStream();
334+
$request = new ServerRequest(
335+
'POST',
336+
'http://example.com/',
337+
array(),
338+
new HttpBodyStream($stream, 10)
339+
);
340+
341+
$once = $this->expectCallableOnceWith('helloworld');
342+
$never = $this->expectCallableNever();
343+
$middleware($request, function (ServerRequestInterface $request) use ($once, $never) {
344+
$request->getBody()->on('data', $once);
345+
$request->getBody()->resume();
346+
$request->getBody()->on('data', $never);
347+
});
348+
349+
$stream->write('hello');
350+
$stream->write('world');
351+
352+
$deferred->reject(new \RuntimeException());
353+
}
269354
}

0 commit comments

Comments
 (0)