Skip to content

Commit 05e5fad

Browse files
committed
Add StreamingRequestMiddleware to stream incoming requests
This middleware can be used to process incoming requests with a streaming request body (without buffering). This will replace the existing `StreamingServer` class.
1 parent ebd66e8 commit 05e5fad

5 files changed

Lines changed: 220 additions & 22 deletions

File tree

README.md

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
Event-driven, streaming plaintext HTTP and secure HTTPS server for [ReactPHP](https://reactphp.org/).
66

7-
**Table of Contents**
7+
**Table of contents**
88

99
* [Quickstart example](#quickstart-example)
1010
* [Usage](#usage)
@@ -15,13 +15,13 @@ Event-driven, streaming plaintext HTTP and secure HTTPS server for [ReactPHP](ht
1515
* [Request parameters](#request-parameters)
1616
* [Query parameters](#query-parameters)
1717
* [Request body](#request-body)
18-
* [Streaming request](#streaming-request)
18+
* [Streaming incoming request](#streaming-incoming-request)
1919
* [Request method](#request-method)
2020
* [Cookie parameters](#cookie-parameters)
2121
* [Invalid request](#invalid-request)
2222
* [Response](#response)
2323
* [Deferred response](#deferred-response)
24-
* [Streaming response](#streaming-response)
24+
* [Streaming outgoing response](#streaming-outgoing-response)
2525
* [Response length](#response-length)
2626
* [Invalid response](#invalid-response)
2727
* [Default response headers](#default-response-headers)
@@ -220,7 +220,7 @@ in memory. It will invoke the request handler function once the HTTP request
220220
headers have been received, i.e. before receiving the potentially much larger
221221
HTTP request body. This means the [request](#request) passed to your request
222222
handler function may not be fully compatible with PSR-7. See also
223-
[streaming request](#streaming-request) below for more details.
223+
[streaming incoming request](#streaming-incoming-request) below for more details.
224224

225225
### listen()
226226

@@ -389,7 +389,7 @@ This includes the parsed request body and any file uploads.
389389

390390
> If you're using the advanced [`StreamingServer`](#streamingserver) class, jump
391391
to the next chapter to learn more about how to process a
392-
[streaming request](#streaming-request).
392+
[streaming incoming request](#streaming-incoming-request).
393393

394394
As stated above, each incoming HTTP request is always represented by the
395395
[PSR-7 `ServerRequestInterface`](https://www.php-fig.org/psr/psr-7/#321-psrhttpmessageserverrequestinterface).
@@ -485,7 +485,9 @@ header or when using `Transfer-Encoding: chunked` for HTTP/1.1 requests.
485485
intermediary `HTTP/1.1 100 Continue` response to the client. This ensures you
486486
will receive the request body without a delay as expected.
487487

488-
#### Streaming request
488+
#### Streaming incoming request
489+
490+
<a id="streaming-request"></a><!-- legacy fragment id -->
489491

490492
If you're using the advanced [`StreamingServer`](#streamingserver), the
491493
request object will be processed once the request headers have been received.
@@ -784,7 +786,9 @@ The promise cancellation handler can be used to clean up any pending resources
784786
allocated in this case (if applicable).
785787
If a promise is resolved after the client closes, it will simply be ignored.
786788

787-
#### Streaming response
789+
#### Streaming outgoing response
790+
791+
<a id="streaming-response"></a><!-- legacy fragment id -->
788792

789793
The `Response` class in this project supports to add an instance which implements the
790794
[ReactPHP ReadableStreamInterface](https://github.com/reactphp/stream#readablestreaminterface)
@@ -897,7 +901,7 @@ $server = new Server(function (ServerRequestInterface $request) {
897901
```
898902

899903
If the response body size is unknown, a `Content-Length` response header can not
900-
be added automatically. When using a [streaming response](#streaming-response)
904+
be added automatically. When using a [streaming outgoing response](#streaming-outgoing-response)
901905
without an explicit `Content-Length` response header, outgoing HTTP/1.1 response
902906
messages will automatically use `Transfer-Encoding: chunked` while legacy HTTP/1.0
903907
response messages will contain the plain response body. If you know the length
@@ -959,8 +963,8 @@ $server->on('error', function (Exception $e) {
959963
Note that the server will also emit an `error` event if the client sends an
960964
invalid HTTP request that never reaches your request handler function. See
961965
also [invalid request](#invalid-request) for more details.
962-
Additionally, a [streaming request](#streaming-request) body can also emit
963-
an `error` event on the request body.
966+
Additionally, a [streaming incoming request](#streaming-incoming-request) body
967+
can also emit an `error` event on the request body.
964968

965969
The server will only send a very generic `500` (Interval Server Error) HTTP
966970
error response without any further details to the client if an unhandled
@@ -1199,6 +1203,64 @@ feel free to add it to this list.
11991203

12001204
### React\Http\Middleware
12011205

1206+
#### StreamingRequestMiddleware
1207+
1208+
The `StreamingRequestMiddleware` can be used to
1209+
process incoming requests with a streaming request body (without buffering).
1210+
1211+
This allows you to process requests of any size without buffering the request
1212+
body in memory. Instead, it will represent the request body as a
1213+
[`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface)
1214+
that emit chunks of incoming data as it is received:
1215+
1216+
```php
1217+
$server = new React\Http\Server(array(
1218+
new React\Http\Middleware\StreamingRequestMiddleware(),
1219+
function (Psr\Http\Message\ServerRequestInterface $request) {
1220+
$body = $request->getBody();
1221+
assert($body instanceof Psr\Http\Message\StreamInterface);
1222+
assert($body instanceof React\Stream\ReadableStreamInterface);
1223+
1224+
return new React\Promise\Promise(function ($resolve) use ($body) {
1225+
$bytes = 0;
1226+
$body->on('data', function ($chunk) use (&$bytes) {
1227+
$bytes += \count($chunk);
1228+
});
1229+
$body->on('close', function () use (&$bytes, $resolve) {
1230+
$resolve(new React\Http\Response(
1231+
200,
1232+
[],
1233+
"Received $bytes bytes\n"
1234+
));
1235+
});
1236+
});
1237+
}
1238+
));
1239+
```
1240+
1241+
See also [streaming incoming request](#streaming-incoming-request)
1242+
for more details.
1243+
1244+
Additionally, this middleware can be used in combination with the
1245+
[`LimitConcurrentRequestsMiddleware`](#limitconcurrentrequestsmiddleware) and
1246+
[`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below)
1247+
to explicitly configure the total number of requests that can be handled at
1248+
once:
1249+
1250+
```php
1251+
$server = new React\Http\Server(array(
1252+
new React\Http\Middleware\StreamingRequestMiddleware(),
1253+
new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
1254+
new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
1255+
new React\Http\Middleware\RequestBodyParserMiddleware(),
1256+
$handler
1257+
));
1258+
```
1259+
1260+
> Internally, this class is used as a "marker" to not trigger the default
1261+
request buffering behavior in the `Server`. It does not implement any logic
1262+
on its own.
1263+
12021264
#### LimitConcurrentRequestsMiddleware
12031265

12041266
The `LimitConcurrentRequestsMiddleware` can be used to
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?php
2+
3+
namespace React\Http\Middleware;
4+
5+
use Psr\Http\Message\ServerRequestInterface;
6+
7+
/**
8+
* Process incoming requests with a streaming request body (without buffering).
9+
*
10+
* This allows you to process requests of any size without buffering the request
11+
* body in memory. Instead, it will represent the request body as a
12+
* [`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface)
13+
* that emit chunks of incoming data as it is received:
14+
*
15+
* ```php
16+
* $server = new React\Http\Server(array(
17+
* new React\Http\Middleware\StreamingRequestMiddleware(),
18+
* function (Psr\Http\Message\ServerRequestInterface $request) {
19+
* $body = $request->getBody();
20+
* assert($body instanceof Psr\Http\Message\StreamInterface);
21+
* assert($body instanceof React\Stream\ReadableStreamInterface);
22+
*
23+
* return new React\Promise\Promise(function ($resolve) use ($body) {
24+
* $bytes = 0;
25+
* $body->on('data', function ($chunk) use (&$bytes) {
26+
* $bytes += \count($chunk);
27+
* });
28+
* $body->on('close', function () use (&$bytes, $resolve) {
29+
* $resolve(new React\Http\Response(
30+
* 200,
31+
* [],
32+
* "Received $bytes bytes\n"
33+
* ));
34+
* });
35+
* });
36+
* }
37+
* ));
38+
* ```
39+
*
40+
* See also [streaming incoming request](../../README.md#streaming-incoming-request)
41+
* for more details.
42+
*
43+
* Additionally, this middleware can be used in combination with the
44+
* [`LimitConcurrentRequestsMiddleware`](#limitconcurrentrequestsmiddleware) and
45+
* [`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below)
46+
* to explicitly configure the total number of requests that can be handled at
47+
* once:
48+
*
49+
* ```php
50+
* $server = new React\Http\Server(array(
51+
* new React\Http\Middleware\StreamingRequestMiddleware(),
52+
* new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
53+
* new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
54+
* new React\Http\Middleware\RequestBodyParserMiddleware(),
55+
* $handler
56+
* ));
57+
* ```
58+
*
59+
* > Internally, this class is used as a "marker" to not trigger the default
60+
* request buffering behavior in the `Server`. It does not implement any logic
61+
* on its own.
62+
*/
63+
final class StreamingRequestMiddleware
64+
{
65+
public function __invoke(ServerRequestInterface $request, $next)
66+
{
67+
return $next($request);
68+
}
69+
}

src/Server.php

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Evenement\EventEmitter;
66
use React\Http\Io\IniUtil;
77
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
8+
use React\Http\Middleware\StreamingRequestMiddleware;
89
use React\Http\Middleware\RequestBodyBufferMiddleware;
910
use React\Http\Middleware\RequestBodyParserMiddleware;
1011
use React\Socket\ServerInterface;
@@ -126,19 +127,29 @@ public function __construct($requestHandler)
126127
throw new \InvalidArgumentException('Invalid request handler given');
127128
}
128129

130+
$streaming = false;
131+
foreach ((array) $requestHandler as $handler) {
132+
if ($handler instanceof StreamingRequestMiddleware) {
133+
$streaming = true;
134+
break;
135+
}
136+
}
137+
129138
$middleware = array();
130-
$middleware[] = new LimitConcurrentRequestsMiddleware(
131-
$this->getConcurrentRequestsLimit(\ini_get('memory_limit'), \ini_get('post_max_size'))
132-
);
133-
$middleware[] = new RequestBodyBufferMiddleware();
134-
// Checking for an empty string because that is what a boolean
135-
// false is returned as by ini_get depending on the PHP version.
136-
// @link http://php.net/manual/en/ini.core.php#ini.enable-post-data-reading
137-
// @link http://php.net/manual/en/function.ini-get.php#refsect1-function.ini-get-notes
138-
// @link https://3v4l.org/qJtsa
139-
$enablePostDataReading = \ini_get('enable_post_data_reading');
140-
if ($enablePostDataReading !== '') {
141-
$middleware[] = new RequestBodyParserMiddleware();
139+
if (!$streaming) {
140+
$middleware[] = new LimitConcurrentRequestsMiddleware(
141+
$this->getConcurrentRequestsLimit(\ini_get('memory_limit'), \ini_get('post_max_size'))
142+
);
143+
$middleware[] = new RequestBodyBufferMiddleware();
144+
// Checking for an empty string because that is what a boolean
145+
// false is returned as by ini_get depending on the PHP version.
146+
// @link http://php.net/manual/en/ini.core.php#ini.enable-post-data-reading
147+
// @link http://php.net/manual/en/function.ini-get.php#refsect1-function.ini-get-notes
148+
// @link https://3v4l.org/qJtsa
149+
$enablePostDataReading = \ini_get('enable_post_data_reading');
150+
if ($enablePostDataReading !== '') {
151+
$middleware[] = new RequestBodyParserMiddleware();
152+
}
142153
}
143154

144155
if (\is_callable($requestHandler)) {
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
namespace React\Tests\Http\Middleware;
4+
5+
use React\Http\Middleware\StreamingRequestMiddleware;
6+
use React\Http\Io\ServerRequest;
7+
use React\Http\Response;
8+
use React\Tests\Http\TestCase;
9+
10+
class StreamingRequestMiddlewareTest extends TestCase
11+
{
12+
public function testInvokeMiddlewareReturnsResponseFromFollowingHandler()
13+
{
14+
$middleware = new StreamingRequestMiddleware();
15+
16+
$response = new Response();
17+
$ret = $middleware(new ServerRequest('GET', 'https://example.com/'), function () use ($response) {
18+
return $response;
19+
});
20+
21+
$this->assertSame($response, $ret);
22+
}
23+
}

tests/ServerTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
use React\Promise\Deferred;
99
use Clue\React\Block;
1010
use React\Promise;
11+
use React\Http\Middleware\StreamingRequestMiddleware;
12+
use React\Stream\ReadableStreamInterface;
1113

1214
final class ServerTest extends TestCase
1315
{
@@ -142,6 +144,37 @@ public function testPostFileUpload()
142144
$this->assertSame("hello\r\n", (string)$files['file']->getStream());
143145
}
144146

147+
public function testServerReceivesBufferedRequestByDefault()
148+
{
149+
$streaming = null;
150+
$server = new Server(function (ServerRequestInterface $request) use (&$streaming) {
151+
$streaming = $request->getBody() instanceof ReadableStreamInterface;
152+
});
153+
154+
$server->listen($this->socket);
155+
$this->socket->emit('connection', array($this->connection));
156+
$this->connection->emit('data', array("GET / HTTP/1.0\r\n\r\n"));
157+
158+
$this->assertEquals(false, $streaming);
159+
}
160+
161+
public function testServerWithStreamingRequestMiddlewareReceivesStreamingRequest()
162+
{
163+
$streaming = null;
164+
$server = new Server(array(
165+
new StreamingRequestMiddleware(),
166+
function (ServerRequestInterface $request) use (&$streaming) {
167+
$streaming = $request->getBody() instanceof ReadableStreamInterface;
168+
}
169+
));
170+
171+
$server->listen($this->socket);
172+
$this->socket->emit('connection', array($this->connection));
173+
$this->connection->emit('data', array("GET / HTTP/1.0\r\n\r\n"));
174+
175+
$this->assertEquals(true, $streaming);
176+
}
177+
145178
public function testForwardErrors()
146179
{
147180
$exception = new \Exception();

0 commit comments

Comments
 (0)