Skip to content

Commit 136f78b

Browse files
committed
Ensure LimitHandlersMiddleware pauses streaming requests
1 parent 9e484fd commit 136f78b

5 files changed

Lines changed: 356 additions & 4 deletions

File tree

README.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Event-driven, streaming plaintext HTTP and secure HTTPS server for [ReactPHP](ht
1313
* [Request](#request)
1414
* [Response](#response)
1515
* [Middleware](#middleware)
16+
* [LimitHandlersMiddleware](#limithandlersmiddleware)
1617
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
1718
* [RequestBodyParserMiddleware](#requestbodyparsermiddleware)
1819
* [Third-Party Middleware](#third-party-middleware)
@@ -681,6 +682,59 @@ $server = new StreamingServer(new MiddlewareRunner([
681682
]));
682683
```
683684

685+
#### LimitHandlersMiddleware
686+
687+
The `LimitHandlersMiddleware` can be used to
688+
limit how many next handlers can be executed concurrently.
689+
690+
If this middleware is invoked, it will check if the number of pending
691+
handlers is below the allowed limit and then simply invoke the next handler
692+
and it will return whatever the next handler returns (or throws).
693+
694+
If the number of pending handlers exceeds the allowed limit, the request will
695+
be queued (and its streaming body will be paused) and it will return a pending
696+
promise.
697+
Once a pending handler returns (or throws), it will pick the oldest request
698+
from this queue and invokes the next handler (and its streaming body will be
699+
resumed).
700+
701+
The following example shows how this middleware can be used to ensure no more
702+
than 10 handlers will be invoked at once:
703+
704+
```php
705+
$server = new StreamingServer(new MiddlewareRunner([
706+
new LimitHandlersMiddleware(10),
707+
$handler
708+
]));
709+
```
710+
711+
Similarly, this middleware is often used in combination with the
712+
[`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below)
713+
to limit the total number of requests that can be buffered at once:
714+
715+
```php
716+
$server = new StreamingServer(new MiddlewareRunner([
717+
new LimitHandlersMiddleware(100), // 100 concurrent buffering handlers
718+
new RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
719+
new RequestBodyParserMiddleware(),
720+
$handler
721+
]));
722+
```
723+
724+
More sophisticated examples include limiting the total number of requests
725+
that can be buffered at once and then ensure the actual request handler only
726+
processes one request after another without any concurrency:
727+
728+
```php
729+
$server = new StreamingServer(new MiddlewareRunner([
730+
new LimitHandlersMiddleware(100), // 100 concurrent buffering handlers
731+
new RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
732+
new RequestBodyParserMiddleware(),
733+
new LimitHandlersMiddleware(1), // only execute 1 handler (no concurrency)
734+
$handler
735+
]));
736+
```
737+
684738
#### RequestBodyBufferMiddleware
685739

686740
One of the built-in middleware is the `RequestBodyBufferMiddleware` which
@@ -714,10 +768,18 @@ Similarly, this will immediately invoke the next middleware handler for requests
714768
that have an empty request body (such as a simple `GET` request) and requests
715769
that are already buffered (such as due to another middleware).
716770

771+
Note that the given buffer size limit is applied to each request individually.
772+
This means that if you allow a 2 MiB limit and then receive 1000 concurrent
773+
requests, up to 2000 MiB may be allocated for these buffers alone.
774+
As such, it's highly recommended to use this along with the
775+
[`LimitHandlersMiddleware`](#limithandlersmiddleware) (see above) to limit
776+
the total number of concurrent requests.
777+
717778
Usage:
718779

719780
```php
720781
$middlewares = new MiddlewareRunner([
782+
new LimitHandlersMiddleware(100), // 100 concurrent buffering handlers
721783
new RequestBodyBufferMiddleware(16 * 1024 * 1024), // 16 MiB
722784
function (ServerRequestInterface $request, callable $next) {
723785
// The body from $request->getBody() is now fully available without the need to stream it
@@ -776,6 +838,7 @@ $handler = function (ServerRequestInterface $request) {
776838
};
777839

778840
$server = new StreamingServer(new MiddlewareRunner([
841+
new LimitHandlersMiddleware(100), // 100 concurrent buffering handlers
779842
new RequestBodyBufferMiddleware(16 * 1024 * 1024), // 16 MiB
780843
new RequestBodyParserMiddleware(),
781844
$handler

examples/12-upload.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Psr\Http\Message\UploadedFileInterface;
1212
use React\EventLoop\Factory;
1313
use React\Http\MiddlewareRunner;
14+
use React\Http\Middleware\LimitHandlersMiddleware;
1415
use React\Http\Middleware\RequestBodyBufferMiddleware;
1516
use React\Http\Middleware\RequestBodyParserMiddleware;
1617
use React\Http\Response;
@@ -121,6 +122,7 @@
121122

122123
// buffer and parse HTTP request body before running our request handler
123124
$server = new StreamingServer(new MiddlewareRunner(array(
125+
new LimitHandlersMiddleware(100), // 100 concurrent buffering handlers, queue otherwise
124126
new RequestBodyBufferMiddleware(8 * 1024 * 1024), // 8 MiB max, ignore body otherwise
125127
new RequestBodyParserMiddleware(100 * 1024, 1), // 1 file with 100 KiB max, reject upload otherwise
126128
$handler

src/Middleware/LimitHandlersMiddleware.php

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,65 @@
33
namespace React\Http\Middleware;
44

55
use Psr\Http\Message\ServerRequestInterface;
6+
use React\Http\Io\HttpBodyStream;
7+
use React\Http\Io\PauseBufferStream;
68
use React\Promise\Deferred;
79
use React\Stream\ReadableStreamInterface;
810
use SplQueue;
911

12+
/**
13+
* Limits how many next handlers can be executed concurrently.
14+
*
15+
* If this middleware is invoked, it will check if the number of pending
16+
* handlers is below the allowed limit and then simply invoke the next handler
17+
* and it will return whatever the next handler returns (or throws).
18+
*
19+
* If the number of pending handlers exceeds the allowed limit, the request will
20+
* be queued (and its streaming body will be paused) and it will return a pending
21+
* promise.
22+
* Once a pending handler returns (or throws), it will pick the oldest request
23+
* from this queue and invokes the next handler (and its streaming body will be
24+
* resumed).
25+
*
26+
* The following example shows how this middleware can be used to ensure no more
27+
* than 10 handlers will be invoked at once:
28+
*
29+
* ```php
30+
* $server = new StreamingServer(new MiddlewareRunner([
31+
* new LimitHandlersMiddleware(10),
32+
* $handler
33+
* ]));
34+
* ```
35+
*
36+
* Similarly, this middleware is often used in combination with the
37+
* [`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below)
38+
* to limit the total number of requests that can be buffered at once:
39+
*
40+
* ```php
41+
* $server = new StreamingServer(new MiddlewareRunner([
42+
* new LimitHandlersMiddleware(100), // 100 concurrent buffering handlers
43+
* new RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
44+
* new RequestBodyParserMiddleware(),
45+
* $handler
46+
* ]));
47+
* ```
48+
*
49+
* More sophisticated examples include limiting the total number of requests
50+
* that can be buffered at once and then ensure the actual request handler only
51+
* processes one request after another without any concurrency:
52+
*
53+
* ```php
54+
* $server = new StreamingServer(new MiddlewareRunner([
55+
* new LimitHandlersMiddleware(100), // 100 concurrent buffering handlers
56+
* new RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
57+
* new RequestBodyParserMiddleware(),
58+
* new LimitHandlersMiddleware(1), // only execute 1 handler (no concurrency)
59+
* $handler
60+
* ]));
61+
* ```
62+
*
63+
* @see RequestBodyBufferMiddleware
64+
*/
1065
final class LimitHandlersMiddleware
1166
{
1267
private $limit;
@@ -29,6 +84,14 @@ public function __invoke(ServerRequestInterface $request, $next)
2984
{
3085
$body = $request->getBody();
3186
if ($body instanceof ReadableStreamInterface) {
87+
// replace with buffering body to ensure any readable events will be buffered
88+
$body = new HttpBodyStream(
89+
new PauseBufferStream($body),
90+
$body->getSize()
91+
);
92+
93+
// pause actual body to stop emitting data until the handler is called
94+
$request = $request->withBody($body);
3295
$body->pause();
3396
}
3497
$deferred = new Deferred();
@@ -40,11 +103,16 @@ public function __invoke(ServerRequestInterface $request, $next)
40103
$pending = &$this->pending;
41104
return $deferred->promise()->then(function () use ($request, $next, &$pending) {
42105
$pending++;
106+
107+
$ret = $next($request);
108+
109+
// resume readable stream and replay buffered events
43110
$body = $request->getBody();
44111
if ($body instanceof ReadableStreamInterface) {
45112
$body->resume();
46113
}
47-
return $next($request);
114+
115+
return $ret;
48116
})->then(function ($response) use ($that, &$pending) {
49117
$pending--;
50118
$that->processQueue();

tests/FunctionalServerTest.php

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace React\Tests\Http;
44

5+
use Psr\Http\Message\ServerRequestInterface;
6+
use React\Http\Middleware\LimitHandlersMiddleware;
7+
use React\Http\Middleware\RequestBodyBufferMiddleware;
58
use React\Http\MiddlewareRunner;
69
use React\Socket\Server as Socket;
710
use React\EventLoop\Factory;
@@ -12,7 +15,7 @@
1215
use Clue\React\Block;
1316
use React\Http\Response;
1417
use React\Socket\SecureServer;
15-
use React\Promise\Promise;
18+
use React\Promise;
1619
use React\Promise\Stream;
1720
use React\Stream\ThroughStream;
1821

@@ -653,7 +656,7 @@ public function testConnectWithThroughStreamReturnedFromPromiseReturnsDataAsGive
653656
$stream->end();
654657
});
655658

656-
return new Promise(function ($resolve) use ($loop, $stream) {
659+
return new Promise\Promise(function ($resolve) use ($loop, $stream) {
657660
$loop->addTimer(0.001, function () use ($resolve, $stream) {
658661
$resolve(new Response(200, array(), $stream));
659662
});
@@ -715,6 +718,53 @@ public function testConnectWithClosedThroughStreamReturnsNoData()
715718

716719
$socket->close();
717720
}
721+
722+
public function testLimitHandlersMiddlewareRequestStreamPausing()
723+
{
724+
$loop = Factory::create();
725+
$connector = new Connector($loop);
726+
727+
$server = new StreamingServer(new MiddlewareRunner(array(
728+
new LimitHandlersMiddleware(5),
729+
new RequestBodyBufferMiddleware(16 * 1024 * 1024), // 16 MiB
730+
function (ServerRequestInterface $request, $next) use ($loop) {
731+
return new Promise\Promise(function ($resolve) use ($request, $loop, $next) {
732+
$loop->addTimer(0.1, function () use ($request, $resolve, $next) {
733+
$resolve($next($request));
734+
});
735+
});
736+
},
737+
function (ServerRequestInterface $request) {
738+
return new Response(200, array(), (string)strlen((string)$request->getBody()));
739+
}
740+
)));
741+
742+
$socket = new Socket(0, $loop);
743+
$server->listen($socket);
744+
745+
$result = array();
746+
for ($i = 0; $i < 6; $i++) {
747+
$result[] = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) {
748+
$conn->write(
749+
"GET / HTTP/1.0\r\nContent-Length: 1024\r\nHost: " . noScheme($conn->getRemoteAddress()) . "\r\n\r\n" .
750+
str_repeat('a', 1024) .
751+
"\r\n\r\n"
752+
);
753+
754+
return Stream\buffer($conn);
755+
});
756+
}
757+
758+
$responses = Block\await(Promise\all($result), $loop, 1.0);
759+
760+
foreach ($responses as $response) {
761+
$this->assertContains("HTTP/1.0 200 OK", $response, $response);
762+
$this->assertTrue(substr($response, -4) == 1024, $response);
763+
}
764+
765+
$socket->close();
766+
}
767+
718768
}
719769

720770
function noScheme($uri)

0 commit comments

Comments
 (0)