Skip to content

Commit e5ab4ac

Browse files
authored
Merge pull request #296 from clue-labs/happy-limit
Improve performance by avoiding unneeded promise wrapping when concurrency is below limit
2 parents e38a527 + 485769d commit e5ab4ac

2 files changed

Lines changed: 152 additions & 27 deletions

File tree

src/Middleware/LimitConcurrentRequestsMiddleware.php

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
namespace React\Http\Middleware;
44

5+
use Psr\Http\Message\ResponseInterface;
56
use Psr\Http\Message\ServerRequestInterface;
67
use React\Http\Io\HttpBodyStream;
78
use React\Http\Io\PauseBufferStream;
89
use React\Promise;
10+
use React\Promise\PromiseInterface;
911
use React\Promise\Deferred;
1012
use React\Stream\ReadableStreamInterface;
1113

@@ -81,6 +83,35 @@ public function __construct($limit)
8183

8284
public function __invoke(ServerRequestInterface $request, $next)
8385
{
86+
// happy path: simply invoke next request handler if we're below limit
87+
if ($this->pending < $this->limit) {
88+
++$this->pending;
89+
90+
try {
91+
$response = $next($request);
92+
} catch (\Exception $e) {
93+
$this->processQueue();
94+
throw $e;
95+
} catch (\Throwable $e) { // @codeCoverageIgnoreStart
96+
// handle Errors just like Exceptions (PHP 7+ only)
97+
$this->processQueue();
98+
throw $e; // @codeCoverageIgnoreEnd
99+
}
100+
101+
// happy path: if next request handler returned immediately,
102+
// we can simply try to invoke the next queued request
103+
if ($response instanceof ResponseInterface) {
104+
$this->processQueue();
105+
return $response;
106+
}
107+
108+
// if the next handler returns a pending promise, we have to
109+
// await its resolution before invoking next queued request
110+
return $this->await(Promise\resolve($response));
111+
}
112+
113+
// if we reach this point, then this request will need to be queued
114+
// check if the body is streaming, in which case we need to buffer everything
84115
$body = $request->getBody();
85116
if ($body instanceof ReadableStreamInterface) {
86117
// pause actual body to stop emitting data until the handler is called
@@ -110,13 +141,12 @@ public function __invoke(ServerRequestInterface $request, $next)
110141

111142
// queue request and process queue if pending does not exceed limit
112143
$queue[$id] = $deferred;
113-
$this->processQueue();
114144

115145
$that = $this;
116146
$pending = &$this->pending;
117-
return $deferred->promise()->then(function () use ($request, $next, $body, &$pending) {
118-
$pending++;
119-
147+
return $this->await($deferred->promise()->then(function () use ($request, $next, $body, &$pending) {
148+
// invoke next request handler
149+
++$pending;
120150
$ret = $next($request);
121151

122152
// resume readable stream and replay buffered events
@@ -125,13 +155,18 @@ public function __invoke(ServerRequestInterface $request, $next)
125155
}
126156

127157
return $ret;
128-
})->then(function ($response) use ($that, &$pending) {
129-
$pending--;
158+
}));
159+
}
160+
161+
private function await(PromiseInterface $promise)
162+
{
163+
$that = $this;
164+
165+
return $promise->then(function ($response) use ($that) {
130166
$that->processQueue();
131167

132168
return $response;
133-
}, function ($error) use ($that, &$pending) {
134-
$pending--;
169+
}, function ($error) use ($that) {
135170
$that->processQueue();
136171

137172
return Promise\reject($error);
@@ -143,11 +178,8 @@ public function __invoke(ServerRequestInterface $request, $next)
143178
*/
144179
public function processQueue()
145180
{
146-
if ($this->pending >= $this->limit) {
147-
return;
148-
}
149-
150-
if (!$this->queue) {
181+
// skip if we're still above concurrency limit or there's no queued request waiting
182+
if (--$this->pending >= $this->limit || !$this->queue) {
151183
return;
152184
}
153185

tests/Middleware/LimitConcurrentRequestsMiddlewareTest.php

Lines changed: 107 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use React\Stream\ThroughStream;
1212
use React\Tests\Http\TestCase;
1313
use React\Promise\PromiseInterface;
14+
use React\Http\Response;
1415

1516
final class LimitConcurrentRequestsMiddlewareTest extends TestCase
1617
{
@@ -94,13 +95,110 @@ public function testLimitOneRequestConcurrently()
9495
$this->assertTrue($calledC);
9596
}
9697

97-
public function testStreamPauseAndResume()
98+
public function testReturnsResponseDirectlyFromMiddlewareWhenBelowLimit()
99+
{
100+
$middleware = new LimitConcurrentRequestsMiddleware(1);
101+
102+
$response = new Response();
103+
$ret = $middleware(new ServerRequest('GET', 'https://example.com/'), function () use ($response) {
104+
return $response;
105+
});
106+
107+
$this->assertSame($response, $ret);
108+
}
109+
110+
/**
111+
* @expectedException RuntimeException
112+
* @expectedExceptionMessage demo
113+
*/
114+
public function testThrowsExceptionDirectlyFromMiddlewareWhenBelowLimit()
115+
{
116+
$middleware = new LimitConcurrentRequestsMiddleware(1);
117+
118+
$middleware(new ServerRequest('GET', 'https://example.com/'), function () {
119+
throw new \RuntimeException('demo');
120+
});
121+
}
122+
123+
/**
124+
* @requires PHP 7
125+
* @expectedException Error
126+
* @expectedExceptionMessage demo
127+
*/
128+
public function testThrowsErrorDirectlyFromMiddlewareWhenBelowLimit()
129+
{
130+
$middleware = new LimitConcurrentRequestsMiddleware(1);
131+
132+
$middleware(new ServerRequest('GET', 'https://example.com/'), function () {
133+
throw new \Error('demo');
134+
});
135+
}
136+
137+
public function testReturnsPendingPromiseChainedFromMiddlewareWhenBelowLimit()
138+
{
139+
$middleware = new LimitConcurrentRequestsMiddleware(1);
140+
141+
$deferred = new Deferred();
142+
$ret = $middleware(new ServerRequest('GET', 'https://example.com/'), function () use ($deferred) {
143+
return $deferred->promise();
144+
});
145+
146+
$this->assertTrue($ret instanceof PromiseInterface);
147+
}
148+
149+
public function testReturnsPendingPromiseFromMiddlewareWhenAboveLimit()
150+
{
151+
$middleware = new LimitConcurrentRequestsMiddleware(1);
152+
153+
$middleware(new ServerRequest('GET', 'https://example.com/'), function () {
154+
return new Promise(function () { });
155+
});
156+
157+
$ret = $middleware(new ServerRequest('GET', 'https://example.com/'), function () {
158+
return new Response();
159+
});
160+
161+
$this->assertTrue($ret instanceof PromiseInterface);
162+
}
163+
164+
public function testStreamDoesNotPauseOrResumeWhenBelowLimit()
165+
{
166+
$body = $this->getMockBuilder('React\Http\Io\HttpBodyStream')->disableOriginalConstructor()->getMock();
167+
$body->expects($this->never())->method('pause');
168+
$body->expects($this->never())->method('resume');
169+
$limitHandlers = new LimitConcurrentRequestsMiddleware(1);
170+
$limitHandlers(new ServerRequest('GET', 'https://example.com/', array(), $body), function () {});
171+
}
172+
173+
public function testStreamDoesPauseWhenAboveLimit()
174+
{
175+
$body = $this->getMockBuilder('React\Http\Io\HttpBodyStream')->disableOriginalConstructor()->getMock();
176+
$body->expects($this->once())->method('pause');
177+
$body->expects($this->never())->method('resume');
178+
$limitHandlers = new LimitConcurrentRequestsMiddleware(1);
179+
180+
$limitHandlers(new ServerRequest('GET', 'https://example.com'), function () {
181+
return new Promise(function () { });
182+
});
183+
184+
$limitHandlers(new ServerRequest('GET', 'https://example.com/', array(), $body), function () {});
185+
}
186+
187+
public function testStreamDoesPauseAndThenResumeWhenDequeued()
98188
{
99189
$body = $this->getMockBuilder('React\Http\Io\HttpBodyStream')->disableOriginalConstructor()->getMock();
100190
$body->expects($this->once())->method('pause');
101191
$body->expects($this->once())->method('resume');
102192
$limitHandlers = new LimitConcurrentRequestsMiddleware(1);
193+
194+
$deferred = new Deferred();
195+
$limitHandlers(new ServerRequest('GET', 'https://example.com'), function () use ($deferred) {
196+
return $deferred->promise();
197+
});
198+
103199
$limitHandlers(new ServerRequest('GET', 'https://example.com/', array(), $body), function () {});
200+
201+
$deferred->reject();
104202
}
105203

106204
public function testReceivesBufferedRequestSameInstance()
@@ -121,7 +219,7 @@ public function testReceivesBufferedRequestSameInstance()
121219
$this->assertSame($request, $req);
122220
}
123221

124-
public function testReceivesStreamingBodyChangesInstanceWithCustomBodyButSameData()
222+
public function testReceivesStreamingBodyRequestSameInstanceWhenBelowLimit()
125223
{
126224
$stream = new ThroughStream();
127225
$request = new ServerRequest(
@@ -137,15 +235,9 @@ public function testReceivesStreamingBodyChangesInstanceWithCustomBodyButSameDat
137235
$req = $request;
138236
});
139237

140-
$this->assertNotSame($request, $req);
141-
$this->assertInstanceOf('Psr\Http\Message\ServerRequestInterface', $req);
238+
$this->assertSame($request, $req);
142239

143240
$body = $req->getBody();
144-
$this->assertInstanceOf('React\Stream\ReadableStreamInterface', $body);
145-
/* @var $body \React\Stream\ReadableStreamInterface */
146-
147-
$this->assertEquals(5, $body->getSize());
148-
149241
$body->on('data', $this->expectCallableOnce('hello'));
150242
$stream->write('hello');
151243
}
@@ -273,7 +365,7 @@ public function testReceivesNextRequestAfterPreviousHandlerIsCancelled()
273365
$middleware($request, $this->expectCallableOnceWith($request));
274366
}
275367

276-
public function testReceivesNextStreamingBodyWithSameDataAfterPreviousHandlerIsSettled()
368+
public function testReceivesStreamingBodyChangesInstanceWithCustomBodyButSameDataWhenDequeued()
277369
{
278370
$stream = new ThroughStream();
279371
$request = new ServerRequest(
@@ -283,19 +375,20 @@ public function testReceivesNextStreamingBodyWithSameDataAfterPreviousHandlerIsS
283375
new HttpBodyStream($stream, 5)
284376
);
285377

286-
$deferred = new Deferred();
287378
$middleware = new LimitConcurrentRequestsMiddleware(1);
288-
$middleware($request, function () use ($deferred) {
379+
380+
$deferred = new Deferred();
381+
$middleware(new ServerRequest('GET', 'https://example.com/'), function () use ($deferred) {
289382
return $deferred->promise();
290383
});
291384

292-
$deferred->reject(new \RuntimeException());
293-
294385
$req = null;
295386
$middleware($request, function (ServerRequestInterface $request) use (&$req) {
296387
$req = $request;
297388
});
298389

390+
$deferred->reject();
391+
299392
$this->assertNotSame($request, $req);
300393
$this->assertInstanceOf('Psr\Http\Message\ServerRequestInterface', $req);
301394

0 commit comments

Comments
 (0)