Skip to content

Commit 262bd0f

Browse files
authored
Merge pull request #272 from clue-labs/limit-handlers
Add LimitConcurrentRequestsMiddleware to limit how many next handlers can be executed concurrently
2 parents 54033d8 + dc60cdd commit 262bd0f

7 files changed

Lines changed: 1123 additions & 2 deletions

File tree

README.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Event-driven, streaming plaintext HTTP and secure HTTPS server for [ReactPHP](ht
1313
* [Request](#request)
1414
* [Response](#response)
1515
* [Middleware](#middleware)
16+
* [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware)
1617
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
1718
* [RequestBodyParserMiddleware](#requestbodyparsermiddleware)
1819
* [Third-Party Middleware](#third-party-middleware)
@@ -681,6 +682,59 @@ $server = new StreamingServer(new MiddlewareRunner([
681682
]));
682683
```
683684

685+
#### LimitConcurrentRequestsMiddleware
686+
687+
The `LimitConcurrentRequestsMiddleware` can be used to
688+
limit how many next handlers can be executed concurrently.
689+
690+
If this middleware is invoked, it will check if the number of pending
691+
handlers is below the allowed limit and then simply invoke the next handler
692+
and it will return whatever the next handler returns (or throws).
693+
694+
If the number of pending handlers exceeds the allowed limit, the request will
695+
be queued (and its streaming body will be paused) and it will return a pending
696+
promise.
697+
Once a pending handler returns (or throws), it will pick the oldest request
698+
from this queue and invokes the next handler (and its streaming body will be
699+
resumed).
700+
701+
The following example shows how this middleware can be used to ensure no more
702+
than 10 handlers will be invoked at once:
703+
704+
```php
705+
$server = new StreamingServer(new MiddlewareRunner([
706+
new LimitConcurrentRequestsMiddleware(10),
707+
$handler
708+
]));
709+
```
710+
711+
Similarly, this middleware is often used in combination with the
712+
[`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below)
713+
to limit the total number of requests that can be buffered at once:
714+
715+
```php
716+
$server = new StreamingServer(new MiddlewareRunner([
717+
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
718+
new RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
719+
new RequestBodyParserMiddleware(),
720+
$handler
721+
]));
722+
```
723+
724+
More sophisticated examples include limiting the total number of requests
725+
that can be buffered at once and then ensure the actual request handler only
726+
processes one request after another without any concurrency:
727+
728+
```php
729+
$server = new StreamingServer(new MiddlewareRunner([
730+
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
731+
new RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
732+
new RequestBodyParserMiddleware(),
733+
new LimitConcurrentRequestsMiddleware(1), // only execute 1 handler (no concurrency)
734+
$handler
735+
]));
736+
```
737+
684738
#### RequestBodyBufferMiddleware
685739

686740
One of the built-in middleware is the `RequestBodyBufferMiddleware` which
@@ -714,10 +768,18 @@ Similarly, this will immediately invoke the next middleware handler for requests
714768
that have an empty request body (such as a simple `GET` request) and requests
715769
that are already buffered (such as due to another middleware).
716770

771+
Note that the given buffer size limit is applied to each request individually.
772+
This means that if you allow a 2 MiB limit and then receive 1000 concurrent
773+
requests, up to 2000 MiB may be allocated for these buffers alone.
774+
As such, it's highly recommended to use this along with the
775+
[`LimitConcurrentRequestsMiddleware`](#limitconcurrentrequestsmiddleware) (see above) to limit
776+
the total number of concurrent requests.
777+
717778
Usage:
718779

719780
```php
720781
$middlewares = new MiddlewareRunner([
782+
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
721783
new RequestBodyBufferMiddleware(16 * 1024 * 1024), // 16 MiB
722784
function (ServerRequestInterface $request, callable $next) {
723785
// The body from $request->getBody() is now fully available without the need to stream it
@@ -776,6 +838,7 @@ $handler = function (ServerRequestInterface $request) {
776838
};
777839

778840
$server = new StreamingServer(new MiddlewareRunner([
841+
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
779842
new RequestBodyBufferMiddleware(16 * 1024 * 1024), // 16 MiB
780843
new RequestBodyParserMiddleware(),
781844
$handler

examples/12-upload.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Psr\Http\Message\UploadedFileInterface;
1212
use React\EventLoop\Factory;
1313
use React\Http\MiddlewareRunner;
14+
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
1415
use React\Http\Middleware\RequestBodyBufferMiddleware;
1516
use React\Http\Middleware\RequestBodyParserMiddleware;
1617
use React\Http\Response;
@@ -121,6 +122,7 @@
121122

122123
// buffer and parse HTTP request body before running our request handler
123124
$server = new StreamingServer(new MiddlewareRunner(array(
125+
new LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers, queue otherwise
124126
new RequestBodyBufferMiddleware(8 * 1024 * 1024), // 8 MiB max, ignore body otherwise
125127
new RequestBodyParserMiddleware(100 * 1024, 1), // 1 file with 100 KiB max, reject upload otherwise
126128
$handler

src/Io/PauseBufferStream.php

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
<?php
2+
3+
namespace React\Http\Io;
4+
5+
use Evenement\EventEmitter;
6+
use React\Stream\ReadableStreamInterface;
7+
use React\Stream\Util;
8+
use React\Stream\WritableStreamInterface;
9+
10+
/**
11+
* [Internal] Pauses a given stream and buffers all events while paused
12+
*
13+
* This class is used to buffer all events that happen on a given stream while
14+
* it is paused. This allows you to pause a stream and no longer watch for any
15+
* of its events. Once the stream is resumed, all buffered events will be
16+
* emitted. Explicitly closing the resulting stream clears all buffers.
17+
*
18+
* Note that this is an internal class only and nothing you should usually care
19+
* about.
20+
*
21+
* @see ReadableStreamInterface
22+
* @internal
23+
*/
24+
class PauseBufferStream extends EventEmitter implements ReadableStreamInterface
25+
{
26+
private $input;
27+
private $closed = false;
28+
private $paused = false;
29+
private $dataPaused = '';
30+
private $endPaused = false;
31+
private $closePaused = false;
32+
private $errorPaused = null;
33+
private $implicit = false;
34+
35+
public function __construct(ReadableStreamInterface $input)
36+
{
37+
$this->input = $input;
38+
39+
$this->input->on('data', array($this, 'handleData'));
40+
$this->input->on('end', array($this, 'handleEnd'));
41+
$this->input->on('error', array($this, 'handleError'));
42+
$this->input->on('close', array($this, 'handleClose'));
43+
}
44+
45+
/**
46+
* pause and remember this was not explicitly from user control
47+
*
48+
* @internal
49+
*/
50+
public function pauseImplicit()
51+
{
52+
$this->pause();
53+
$this->implicit = true;
54+
}
55+
56+
/**
57+
* resume only if this was previously paused implicitly and not explicitly from user control
58+
*
59+
* @internal
60+
*/
61+
public function resumeImplicit()
62+
{
63+
if ($this->implicit) {
64+
$this->resume();
65+
}
66+
}
67+
68+
public function isReadable()
69+
{
70+
return !$this->closed;
71+
}
72+
73+
public function pause()
74+
{
75+
if ($this->closed) {
76+
return;
77+
}
78+
79+
$this->input->pause();
80+
$this->paused = true;
81+
$this->implicit = false;
82+
}
83+
84+
public function resume()
85+
{
86+
if ($this->closed) {
87+
return;
88+
}
89+
90+
$this->paused = false;
91+
$this->implicit = false;
92+
93+
if ($this->dataPaused !== '') {
94+
$this->emit('data', array($this->dataPaused));
95+
$this->dataPaused = '';
96+
}
97+
98+
if ($this->errorPaused) {
99+
$this->emit('error', array($this->errorPaused));
100+
return $this->close();
101+
}
102+
103+
if ($this->endPaused) {
104+
$this->endPaused = false;
105+
$this->emit('end');
106+
return $this->close();
107+
}
108+
109+
if ($this->closePaused) {
110+
$this->closePaused = false;
111+
return $this->close();
112+
}
113+
114+
$this->input->resume();
115+
}
116+
117+
public function pipe(WritableStreamInterface $dest, array $options = array())
118+
{
119+
Util::pipe($this, $dest, $options);
120+
121+
return $dest;
122+
}
123+
124+
public function close()
125+
{
126+
if ($this->closed) {
127+
return;
128+
}
129+
130+
$this->closed = true;
131+
$this->dataPaused = '';
132+
$this->endPaused = $this->closePaused = false;
133+
$this->errorPaused = null;
134+
135+
$this->input->close();
136+
137+
$this->emit('close');
138+
$this->removeAllListeners();
139+
}
140+
141+
/** @internal */
142+
public function handleData($data)
143+
{
144+
if ($this->paused) {
145+
$this->dataPaused .= $data;
146+
return;
147+
}
148+
149+
$this->emit('data', array($data));
150+
}
151+
152+
/** @internal */
153+
public function handleError(\Exception $e)
154+
{
155+
if ($this->paused) {
156+
$this->errorPaused = $e;
157+
return;
158+
}
159+
160+
$this->emit('error', array($e));
161+
$this->close();
162+
}
163+
164+
/** @internal */
165+
public function handleEnd()
166+
{
167+
if ($this->paused) {
168+
$this->endPaused = true;
169+
return;
170+
}
171+
172+
if (!$this->closed) {
173+
$this->emit('end');
174+
$this->close();
175+
}
176+
}
177+
178+
/** @internal */
179+
public function handleClose()
180+
{
181+
if ($this->paused) {
182+
$this->closePaused = true;
183+
return;
184+
}
185+
186+
$this->close();
187+
}
188+
}

0 commit comments

Comments
 (0)