Skip to content

Commit 27f45f6

Browse files
committed
Implement waterfall()
1 parent 1b7284b commit 27f45f6

3 files changed

Lines changed: 105 additions & 1 deletion

File tree

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ $loop->run();
5858

5959
## Todo
6060

61-
* Implement waterfall()
6261
* Implement queue()
6362

6463
## Tests

src/React/Async/Util.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,28 @@ public static function parallel($tasks, $callback, $errback)
7171
call_user_func($task, $taskCallback, $taskErrback);
7272
}
7373
}
74+
75+
public static function waterfall($tasks, $callback, $errback)
76+
{
77+
$taskCallback = function () use (&$next) {
78+
call_user_func_array($next, func_get_args());
79+
};
80+
81+
$done = function () use ($callback) {
82+
call_user_func_array($callback, func_get_args());
83+
};
84+
85+
$next = function () use (&$tasks, $taskCallback, $errback, $done) {
86+
if (0 === count($tasks)) {
87+
call_user_func_array($done, func_get_args());
88+
return;
89+
}
90+
91+
$task = array_shift($tasks);
92+
$args = array_merge(func_get_args(), array($taskCallback, $errback));
93+
call_user_func_array($task, $args);
94+
};
95+
96+
$next();
97+
}
7498
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
<?php
2+
3+
namespace React\Tests\Async;
4+
5+
use React\Async\Util;
6+
7+
class UtilWaterfallTest extends TestCase
8+
{
9+
public function testWaterfallWithoutTasks()
10+
{
11+
$tasks = array();
12+
13+
$callback = $this->createCallableMock($this->once(), array());
14+
$errback = $this->createCallableMock($this->never());
15+
16+
Util::waterfall($tasks, $callback, $errback);
17+
}
18+
19+
public function testWaterfallWithTasks()
20+
{
21+
$loop = new \React\EventLoop\StreamSelectLoop();
22+
23+
$tasks = array(
24+
function ($callback, $errback) use ($loop) {
25+
$loop->addTimer(0.05, function () use ($callback) {
26+
$callback('foo');
27+
});
28+
},
29+
function ($foo, $callback, $errback) use ($loop) {
30+
$loop->addTimer(0.05, function () use ($callback, $foo) {
31+
$callback($foo.'bar');
32+
});
33+
},
34+
function ($bar, $callback, $errback) use ($loop) {
35+
$loop->addTimer(0.05, function () use ($callback, $bar) {
36+
$callback($bar.'baz');
37+
});
38+
},
39+
);
40+
41+
$callback = $this->createCallableMock($this->once(), 'foobarbaz');
42+
$errback = $this->createCallableMock($this->never());
43+
44+
Util::waterfall($tasks, $callback, $errback);
45+
46+
$timer = new Timer($this);
47+
$timer->start();
48+
49+
$loop->run();
50+
51+
$timer->stop();
52+
$timer->assertInRange(0.15, 0.30);
53+
}
54+
55+
public function testWaterfallWithError()
56+
{
57+
$called = 0;
58+
59+
$tasks = array(
60+
function ($callback, $errback) use (&$called) {
61+
$callback('foo');
62+
$called++;
63+
},
64+
function ($foo, $callback, $errback) {
65+
$e = new \RuntimeException('whoops');
66+
$errback($e);
67+
},
68+
function ($callback, $errback) use (&$called) {
69+
$callback('bar');
70+
$called++;
71+
},
72+
);
73+
74+
$callback = $this->createCallableMock($this->never());
75+
$errback = $this->createCallableMock($this->once());
76+
77+
Util::waterfall($tasks, $callback, $errback);
78+
79+
$this->assertSame(1, $called);
80+
}
81+
}

0 commit comments

Comments
 (0)