Skip to content

Commit 1793f25

Browse files
authored
Merge pull request #8 from clue-labs/close-event
Explicit close of unwrapped stream should not emit error event
2 parents 4ff79e6 + 6c8ee41 commit 1793f25

5 files changed

Lines changed: 68 additions & 6 deletions

File tree

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,19 @@ If the given promise is already settled and does not resolve with an
167167
instance of `ReadableStreamInterface`, then you will not be able to receive
168168
the `error` event.
169169

170+
You can `close()` the resulting stream at any time, which will either try to
171+
`cancel()` the pending promise or try to `close()` the underlying stream.
172+
173+
```php
174+
$promise = startDownloadStream($uri);
175+
176+
$stream = Stream\unwrapReadable($promise);
177+
178+
$loop->addTimer(2.0, function () use ($stream) {
179+
$stream->close();
180+
});
181+
```
182+
170183
### unwrapWritable()
171184

172185
The `unwrapWritable(PromiseInterface $promise)` function can be used to unwrap
@@ -211,6 +224,19 @@ If the given promise is already settled and does not resolve with an
211224
instance of `WritableStreamInterface`, then you will not be able to receive
212225
the `error` event.
213226

227+
You can `close()` the resulting stream at any time, which will either try to
228+
`cancel()` the pending promise or try to `close()` the underlying stream.
229+
230+
```php
231+
$promise = startUploadStream($uri);
232+
233+
$stream = Stream\unwrapWritable($promise);
234+
235+
$loop->addTimer(2.0, function () use ($stream) {
236+
$stream->close();
237+
});
238+
```
239+
214240
## Install
215241

216242
The recommended way to install this library is [through Composer](https://getcomposer.org).

src/UnwrapReadableStream.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,11 @@ function (ReadableStreamInterface $stream) use ($out, &$closed) {
7575

7676
return $stream;
7777
},
78-
function ($e) use ($out) {
79-
$out->emit('error', array($e, $out));
80-
$out->close();
78+
function ($e) use ($out, &$closed) {
79+
if (!$closed) {
80+
$out->emit('error', array($e, $out));
81+
$out->close();
82+
}
8183
}
8284
);
8385
}

src/UnwrapWritableStream.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,11 @@ function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$endin
8888

8989
return $stream;
9090
},
91-
function ($e) use ($out) {
92-
$out->emit('error', array($e, $out));
93-
$out->close();
91+
function ($e) use ($out, &$closed) {
92+
if (!$closed) {
93+
$out->emit('error', array($e, $out));
94+
$out->close();
95+
}
9496
}
9597
);
9698
}

tests/UnwrapReadableTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,28 @@ public function testClosingStreamMakesItNotReadable()
3131

3232
$stream->on('close', $this->expectCallableOnce());
3333
$stream->on('end', $this->expectCallableNever());
34+
$stream->on('error', $this->expectCallableNever());
3435

3536
$stream->close();
3637

3738
$this->assertFalse($stream->isReadable());
3839
}
3940

41+
public function testClosingRejectingStreamMakesItNotReadable()
42+
{
43+
$promise = Timer\reject(0.001, $this->loop);
44+
$stream = Stream\unwrapReadable($promise);
45+
46+
$stream->on('close', $this->expectCallableOnce());
47+
$stream->on('end', $this->expectCallableNever());
48+
$stream->on('error', $this->expectCallableNever());
49+
50+
$stream->close();
51+
$this->loop->run();
52+
53+
$this->assertFalse($stream->isReadable());
54+
}
55+
4056
public function testClosingStreamWillCancelInputPromiseAndMakeStreamNotReadable()
4157
{
4258
$promise = new \React\Promise\Promise(function () { }, $this->expectCallableOnce());

tests/UnwrapWritableTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,27 @@ public function testClosingStreamMakesItNotWritable()
3030
$stream = Stream\unwrapWritable($promise);
3131

3232
$stream->on('close', $this->expectCallableOnce());
33+
$stream->on('error', $this->expectCallableNever());
3334

3435
$stream->close();
3536

3637
$this->assertFalse($stream->isWritable());
3738
}
3839

40+
public function testClosingRejectingStreamMakesItNotWritable()
41+
{
42+
$promise = Timer\reject(0.001, $this->loop);
43+
$stream = Stream\unwrapWritable($promise);
44+
45+
$stream->on('close', $this->expectCallableOnce());
46+
$stream->on('error', $this->expectCallableNever());
47+
48+
$stream->close();
49+
$this->loop->run();
50+
51+
$this->assertFalse($stream->isWritable());
52+
}
53+
3954
public function testClosingStreamWillCancelInputPromiseAndMakeStreamNotWritable()
4055
{
4156
$promise = new \React\Promise\Promise(function () { }, $this->expectCallableOnce());
@@ -247,6 +262,7 @@ public function testEmitsCloseOnlyOnceWhenClosingStreamMultipleTimes()
247262
$stream = Stream\unwrapWritable($promise);
248263

249264
$stream->on('close', $this->expectCallableOnce());
265+
$stream->on('error', $this->expectCallableNever());
250266

251267
$stream->close();
252268
$stream->close();

0 commit comments

Comments
 (0)