Skip to content

Commit 2fd3728

Browse files
committed
Consistent write() semantics and simplified buffering while connecting
Accordingly, the `drain` event now behaves consistently and we no longer need the custom `headers-written` event.
1 parent a577989 commit 2fd3728

3 files changed

Lines changed: 104 additions & 51 deletions

File tree

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,18 @@ Requests are prepared using the ``Client#request()`` method. Body can be
1010
sent with ``Request#write()``. ``Request#end()`` finishes sending the request
1111
(or sends it at all if no body was written).
1212

13-
Request implements WritableStreamInterface, so a Stream can be piped to
14-
it. Response implements ReadableStreamInterface.
15-
13+
Request implements WritableStreamInterface, so a Stream can be piped to it.
1614
Interesting events emitted by Request:
1715

1816
* `response`: The response headers were received from the server and successfully
1917
parsed. The first argument is a Response instance.
18+
* `drain`: The outgoing buffer drained and the response is ready to accept more
19+
data for the next `write()` call.
2020
* `error`: An error occurred.
2121
* `end`: The request is finished. If an error occurred, it is passed as first
2222
argument. Second and third arguments are the Response and the Request.
2323

24+
Response implements ReadableStreamInterface.
2425
Interesting events emitted by Response:
2526

2627
* `data`: Passes a chunk of the response body as first argument and a Response

src/Request.php

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
use GuzzleHttp\Psr7 as gPsr;
77
use React\Socket\ConnectorInterface;
88
use React\Stream\WritableStreamInterface;
9+
use React\Socket\ConnectionInterface;
910

1011
/**
11-
* @event headers-written
1212
* @event response
1313
* @event drain
1414
* @event error
@@ -32,7 +32,7 @@ class Request implements WritableStreamInterface
3232
private $response;
3333
private $state = self::STATE_INIT;
3434

35-
private $pendingWrites = array();
35+
private $pendingWrites = '';
3636

3737
public function __construct(ConnectorInterface $connector, RequestData $requestData)
3838
{
@@ -45,22 +45,19 @@ public function isWritable()
4545
return self::STATE_END > $this->state;
4646
}
4747

48-
public function writeHead()
48+
private function writeHead()
4949
{
50-
if (self::STATE_WRITING_HEAD <= $this->state) {
51-
throw new \LogicException('Headers already written');
52-
}
53-
5450
$this->state = self::STATE_WRITING_HEAD;
5551

5652
$requestData = $this->requestData;
5753
$streamRef = &$this->stream;
5854
$stateRef = &$this->state;
55+
$pendingWrites = &$this->pendingWrites;
5956

6057
$this
6158
->connect()
6259
->done(
63-
function ($stream) use ($requestData, &$streamRef, &$stateRef) {
60+
function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRef, &$pendingWrites) {
6461
$streamRef = $stream;
6562

6663
$stream->on('drain', array($this, 'handleDrain'));
@@ -71,11 +68,18 @@ function ($stream) use ($requestData, &$streamRef, &$stateRef) {
7168

7269
$headers = (string) $requestData;
7370

74-
$stream->write($headers);
71+
$more = $stream->write($headers . $pendingWrites);
7572

7673
$stateRef = Request::STATE_HEAD_WRITTEN;
7774

78-
$this->emit('headers-written', array($this));
75+
// clear pending writes if non-empty
76+
if ($pendingWrites !== '') {
77+
$pendingWrites = '';
78+
79+
if ($more) {
80+
$this->emit('drain');
81+
}
82+
}
7983
},
8084
array($this, 'handleError')
8185
);
@@ -84,25 +88,16 @@ function ($stream) use ($requestData, &$streamRef, &$stateRef) {
8488
public function write($data)
8589
{
8690
if (!$this->isWritable()) {
87-
return;
91+
return false;
8892
}
8993

94+
// write directly to connection stream if already available
9095
if (self::STATE_HEAD_WRITTEN <= $this->state) {
9196
return $this->stream->write($data);
9297
}
9398

94-
if (!count($this->pendingWrites)) {
95-
$this->on('headers-written', function ($that) {
96-
foreach ($that->pendingWrites as $pw) {
97-
$that->write($pw);
98-
}
99-
$that->pendingWrites = array();
100-
$that->emit('drain', array($that));
101-
});
102-
}
103-
104-
$this->pendingWrites[] = $data;
105-
99+
// otherwise buffer and try to establish connection
100+
$this->pendingWrites .= $data;
106101
if (self::STATE_WRITING_HEAD > $this->state) {
107102
$this->writeHead();
108103
}
@@ -123,9 +118,10 @@ public function end($data = null)
123118
}
124119
}
125120

121+
/** @internal */
126122
public function handleDrain()
127123
{
128-
$this->emit('drain', array($this));
124+
$this->emit('drain');
129125
}
130126

131127
public function handleData($data)
@@ -207,6 +203,7 @@ public function close()
207203
}
208204

209205
$this->state = self::STATE_END;
206+
$this->pendingWrites = '';
210207

211208
if ($this->stream) {
212209
$this->stream->close();

tests/RequestTest.php

Lines changed: 79 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public function requestShouldEmitErrorIfGuzzleParseThrowsException()
221221

222222
$request->on('error', $handler);
223223

224-
$request->writeHead();
224+
$request->end();
225225
$request->handleData("\r\n\r\n");
226226
}
227227

@@ -253,13 +253,9 @@ public function postRequestShouldSendAPostRequest()
253253
$this->successfulConnectionMock();
254254

255255
$this->stream
256-
->expects($this->at(5))
257-
->method('write')
258-
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#"));
259-
$this->stream
260-
->expects($this->at(6))
256+
->expects($this->once())
261257
->method('write')
262-
->with($this->identicalTo("some post data"));
258+
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsome post data$#"));
263259

264260
$factory = $this->createCallableMock();
265261
$factory->expects($this->once())
@@ -285,17 +281,13 @@ public function writeWithAPostRequestShouldSendToTheStream()
285281
$this->stream
286282
->expects($this->at(5))
287283
->method('write')
288-
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#"));
284+
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsome$#"));
289285
$this->stream
290286
->expects($this->at(6))
291287
->method('write')
292-
->with($this->identicalTo("some"));
293-
$this->stream
294-
->expects($this->at(7))
295-
->method('write')
296288
->with($this->identicalTo("post"));
297289
$this->stream
298-
->expects($this->at(8))
290+
->expects($this->at(7))
299291
->method('write')
300292
->with($this->identicalTo("data"));
301293

@@ -326,17 +318,56 @@ public function writeWithAPostRequestShouldSendBodyAfterHeadersAndEmitDrainEvent
326318
$this->stream
327319
->expects($this->at(5))
328320
->method('write')
329-
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#"));
321+
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsomepost$#"))
322+
->willReturn(true);
330323
$this->stream
331324
->expects($this->at(6))
332325
->method('write')
333-
->with($this->identicalTo("some"));
326+
->with($this->identicalTo("data"));
327+
328+
$factory = $this->createCallableMock();
329+
$factory->expects($this->once())
330+
->method('__invoke')
331+
->will($this->returnValue($this->response));
332+
333+
$request->setResponseFactory($factory);
334+
335+
$this->assertFalse($request->write("some"));
336+
$this->assertFalse($request->write("post"));
337+
338+
$request->on('drain', $this->expectCallableOnce());
339+
$request->once('drain', function () use ($request) {
340+
$request->write("data");
341+
$request->end();
342+
});
343+
344+
$resolveConnection();
345+
346+
$request->handleData("HTTP/1.0 200 OK\r\n");
347+
$request->handleData("Content-Type: text/plain\r\n");
348+
$request->handleData("\r\nbody");
349+
}
350+
351+
/** @test */
352+
public function writeWithAPostRequestShouldForwardDrainEventIfFirstChunkExceedsBuffer()
353+
{
354+
$requestData = new RequestData('POST', 'http://www.example.com');
355+
$request = new Request($this->connector, $requestData);
356+
357+
$this->stream = $this->getMockBuilder('React\Socket\Connection')
358+
->disableOriginalConstructor()
359+
->setMethods(array('write'))
360+
->getMock();
361+
362+
$resolveConnection = $this->successfulAsyncConnectionMock();
363+
334364
$this->stream
335-
->expects($this->at(7))
365+
->expects($this->at(0))
336366
->method('write')
337-
->with($this->identicalTo("post"));
367+
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsomepost$#"))
368+
->willReturn(false);
338369
$this->stream
339-
->expects($this->at(8))
370+
->expects($this->at(1))
340371
->method('write')
341372
->with($this->identicalTo("data"));
342373

@@ -350,12 +381,14 @@ public function writeWithAPostRequestShouldSendBodyAfterHeadersAndEmitDrainEvent
350381
$this->assertFalse($request->write("some"));
351382
$this->assertFalse($request->write("post"));
352383

384+
$request->on('drain', $this->expectCallableOnce());
353385
$request->once('drain', function () use ($request) {
354386
$request->write("data");
355387
$request->end();
356388
});
357389

358390
$resolveConnection();
391+
$this->stream->emit('drain');
359392

360393
$request->handleData("HTTP/1.0 200 OK\r\n");
361394
$request->handleData("Content-Type: text/plain\r\n");
@@ -373,17 +406,13 @@ public function pipeShouldPipeDataIntoTheRequestBody()
373406
$this->stream
374407
->expects($this->at(5))
375408
->method('write')
376-
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#"));
409+
->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsome$#"));
377410
$this->stream
378411
->expects($this->at(6))
379412
->method('write')
380-
->with($this->identicalTo("some"));
381-
$this->stream
382-
->expects($this->at(7))
383-
->method('write')
384413
->with($this->identicalTo("post"));
385414
$this->stream
386-
->expects($this->at(8))
415+
->expects($this->at(7))
387416
->method('write')
388417
->with($this->identicalTo("data"));
389418

@@ -424,6 +453,32 @@ public function endShouldOnlyAcceptScalars()
424453
$request->end(array());
425454
}
426455

456+
/**
457+
* @test
458+
*/
459+
public function closeShouldEmitCloseEvent()
460+
{
461+
$requestData = new RequestData('POST', 'http://www.example.com');
462+
$request = new Request($this->connector, $requestData);
463+
464+
$request->on('close', $this->expectCallableOnce());
465+
$request->close();
466+
}
467+
468+
/**
469+
* @test
470+
*/
471+
public function writeAfterCloseReturnsFalse()
472+
{
473+
$requestData = new RequestData('POST', 'http://www.example.com');
474+
$request = new Request($this->connector, $requestData);
475+
476+
$request->close();
477+
478+
$this->assertFalse($request->isWritable());
479+
$this->assertFalse($request->write('nope'));
480+
}
481+
427482
/** @test */
428483
public function requestShouldRelayErrorEventsFromResponse()
429484
{

0 commit comments

Comments
 (0)