Skip to content

Commit c15a687

Browse files
WyriHaximusclue
authored andcommitted
Skeleton for LimitHandlersMiddleware
1 parent 54033d8 commit c15a687

2 files changed

Lines changed: 174 additions & 0 deletions

File tree

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
<?php
2+
3+
namespace React\Http\Middleware;
4+
5+
use Psr\Http\Message\ServerRequestInterface;
6+
use React\Promise\Deferred;
7+
use React\Stream\ReadableStreamInterface;
8+
use SplQueue;
9+
10+
final class LimitHandlersMiddleware
11+
{
12+
private $limit;
13+
private $pending = 0;
14+
private $queued;
15+
16+
/**
17+
* @param int $limit Maximum amount of concurrent requests handled.
18+
*
19+
* For example when $limit is set to 10, 10 requests will flow to $next
20+
* while more incoming requests have to wait until one is done.
21+
*/
22+
public function __construct($limit)
23+
{
24+
$this->limit = $limit;
25+
$this->queued = new SplQueue();
26+
}
27+
28+
public function __invoke(ServerRequestInterface $request, $next)
29+
{
30+
$body = $request->getBody();
31+
if ($body instanceof ReadableStreamInterface) {
32+
$body->pause();
33+
}
34+
$deferred = new Deferred();
35+
$this->queued->enqueue($deferred);
36+
37+
$this->processQueue();
38+
39+
$that = $this;
40+
$pending = &$this->pending;
41+
return $deferred->promise()->then(function () use ($request, $next, &$pending) {
42+
$pending++;
43+
$body = $request->getBody();
44+
if ($body instanceof ReadableStreamInterface) {
45+
$body->resume();
46+
}
47+
return $next($request);
48+
})->then(function ($response) use ($that, &$pending) {
49+
$pending--;
50+
$that->processQueue();
51+
return $response;
52+
}, function ($error) use ($that, &$pending) {
53+
$pending--;
54+
$that->processQueue();
55+
return $error;
56+
});
57+
}
58+
59+
/**
60+
* @internal
61+
*/
62+
public function processQueue()
63+
{
64+
if ($this->pending >= $this->limit) {
65+
return;
66+
}
67+
68+
if ($this->queued->count() === 0) {
69+
return;
70+
}
71+
72+
$this->queued->dequeue()->resolve();
73+
}
74+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
3+
namespace React\Tests\Http\Middleware;
4+
5+
use React\Http\Middleware\LimitHandlersMiddleware;
6+
use React\Http\Io\ServerRequest;
7+
use React\Promise\Deferred;
8+
use React\Tests\Http\TestCase;
9+
10+
final class LimitHandlersMiddlewareTest extends TestCase
11+
{
12+
public function testLimitOneRequestConcurrently()
13+
{
14+
/**
15+
* The first request
16+
*/
17+
$requestA = new ServerRequest('GET', 'https://example.com/');
18+
$deferredA = new Deferred();
19+
$calledA = false;
20+
$nextA = function () use (&$calledA, $deferredA) {
21+
$calledA = true;
22+
return $deferredA->promise();
23+
};
24+
25+
/**
26+
* The second request
27+
*/
28+
$requestB = new ServerRequest('GET', 'https://www.example.com/');
29+
$deferredB = new Deferred();
30+
$calledB = false;
31+
$nextB = function () use (&$calledB, $deferredB) {
32+
$calledB = true;
33+
return $deferredB->promise();
34+
};
35+
36+
/**
37+
* The third request
38+
*/
39+
$requestC = new ServerRequest('GET', 'https://www.example.com/');
40+
$calledC = false;
41+
$nextC = function () use (&$calledC) {
42+
$calledC = true;
43+
};
44+
45+
/**
46+
* The handler
47+
*
48+
*/
49+
$limitHandlers = new LimitHandlersMiddleware(1);
50+
51+
$this->assertFalse($calledA);
52+
$this->assertFalse($calledB);
53+
$this->assertFalse($calledC);
54+
55+
$limitHandlers($requestA, $nextA);
56+
57+
$this->assertTrue($calledA);
58+
$this->assertFalse($calledB);
59+
$this->assertFalse($calledC);
60+
61+
$limitHandlers($requestB, $nextB);
62+
63+
$this->assertTrue($calledA);
64+
$this->assertFalse($calledB);
65+
$this->assertFalse($calledC);
66+
67+
$limitHandlers($requestC, $nextC);
68+
69+
$this->assertTrue($calledA);
70+
$this->assertFalse($calledB);
71+
$this->assertFalse($calledC);
72+
73+
/**
74+
* Ensure resolve frees up a slot
75+
*/
76+
$deferredA->resolve();
77+
78+
$this->assertTrue($calledA);
79+
$this->assertTrue($calledB);
80+
$this->assertFalse($calledC);
81+
82+
/**
83+
* Ensure reject also frees up a slot
84+
*/
85+
$deferredB->reject();
86+
87+
$this->assertTrue($calledA);
88+
$this->assertTrue($calledB);
89+
$this->assertTrue($calledC);
90+
}
91+
92+
public function testStreamPauseAndResume()
93+
{
94+
$body = $this->getMockBuilder('React\Http\Io\HttpBodyStream')->disableOriginalConstructor()->getMock();
95+
$body->expects($this->once())->method('pause');
96+
$body->expects($this->once())->method('resume');
97+
$limitHandlers = new LimitHandlersMiddleware(1);
98+
$limitHandlers(new ServerRequest('GET', 'https://example.com/', array(), $body), function () {});
99+
}
100+
}

0 commit comments

Comments
 (0)