33namespace React \Async ;
44
55use React \EventLoop \Loop ;
6+ use React \Promise \CancellablePromiseInterface ;
67use React \Promise \Deferred ;
78use React \Promise \PromiseInterface ;
89
@@ -96,46 +97,53 @@ function ($error) use (&$exception, &$rejected, &$wait) {
9697 */
9798function parallel (array $ tasks )
9899{
99- $ deferred = new Deferred ();
100- $ results = array ();
101- $ errors = array ();
102-
103- $ done = function () use (&$ results , &$ errors , $ deferred ) {
104- if (count ($ errors )) {
105- $ deferred ->reject (array_shift ($ errors ));
106- return ;
100+ $ pending = array ();
101+ $ deferred = new Deferred (function () use (&$ pending ) {
102+ foreach ($ pending as $ promise ) {
103+ if ($ promise instanceof CancellablePromiseInterface) {
104+ $ promise ->cancel ();
105+ }
107106 }
108-
109- $ deferred ->resolve ($ results );
110- };
107+ $ pending = array ();
108+ });
109+ $ results = array ();
110+ $ errored = false ;
111111
112112 $ numTasks = count ($ tasks );
113-
114113 if (0 === $ numTasks ) {
115- $ done ( );
114+ $ deferred -> resolve ( $ results );
116115 }
117116
118- $ checkDone = function () use (&$ results , &$ errors , $ numTasks , $ done ) {
119- if ($ numTasks === count ($ results ) + count ($ errors )) {
120- $ done ();
121- }
122- };
117+ $ taskErrback = function ($ error ) use (&$ pending , $ deferred , &$ errored ) {
118+ $ errored = true ;
119+ $ deferred ->reject ($ error );
123120
124- $ taskErrback = function ($ error ) use (&$ errors , $ checkDone ) {
125- $ errors [] = $ error ;
126- $ checkDone ();
121+ foreach ($ pending as $ promise ) {
122+ if ($ promise instanceof CancellablePromiseInterface) {
123+ $ promise ->cancel ();
124+ }
125+ }
126+ $ pending = array ();
127127 };
128128
129129 foreach ($ tasks as $ i => $ task ) {
130- $ taskCallback = function ($ result ) use (&$ results , $ i , $ checkDone ) {
130+ $ taskCallback = function ($ result ) use (&$ results , & $ pending , $ numTasks , $ i , $ deferred ) {
131131 $ results [$ i ] = $ result ;
132- $ checkDone ();
132+
133+ if (count ($ results ) === $ numTasks ) {
134+ $ deferred ->resolve ($ results );
135+ }
133136 };
134137
135138 $ promise = call_user_func ($ task );
136139 assert ($ promise instanceof PromiseInterface);
140+ $ pending [$ i ] = $ promise ;
137141
138142 $ promise ->then ($ taskCallback , $ taskErrback );
143+
144+ if ($ errored ) {
145+ break ;
146+ }
139147 }
140148
141149 return $ deferred ->promise ();
@@ -147,7 +155,13 @@ function parallel(array $tasks)
147155 */
148156function series (array $ tasks )
149157{
150- $ deferred = new Deferred ();
158+ $ pending = null ;
159+ $ deferred = new Deferred (function () use (&$ pending ) {
160+ if ($ pending instanceof CancellablePromiseInterface) {
161+ $ pending ->cancel ();
162+ }
163+ $ pending = null ;
164+ });
151165 $ results = array ();
152166
153167 /** @var callable():void $next */
@@ -156,7 +170,7 @@ function series(array $tasks)
156170 $ next ();
157171 };
158172
159- $ next = function () use (&$ tasks , $ taskCallback , $ deferred , &$ results ) {
173+ $ next = function () use (&$ tasks , $ taskCallback , $ deferred , &$ results, & $ pending ) {
160174 if (0 === count ($ tasks )) {
161175 $ deferred ->resolve ($ results );
162176 return ;
@@ -165,6 +179,7 @@ function series(array $tasks)
165179 $ task = array_shift ($ tasks );
166180 $ promise = call_user_func ($ task );
167181 assert ($ promise instanceof PromiseInterface);
182+ $ pending = $ promise ;
168183
169184 $ promise ->then ($ taskCallback , array ($ deferred , 'reject ' ));
170185 };
@@ -180,10 +195,16 @@ function series(array $tasks)
180195 */
181196function waterfall (array $ tasks )
182197{
183- $ deferred = new Deferred ();
198+ $ pending = null ;
199+ $ deferred = new Deferred (function () use (&$ pending ) {
200+ if ($ pending instanceof CancellablePromiseInterface) {
201+ $ pending ->cancel ();
202+ }
203+ $ pending = null ;
204+ });
184205
185206 /** @var callable $next */
186- $ next = function ($ value = null ) use (&$ tasks , &$ next , $ deferred ) {
207+ $ next = function ($ value = null ) use (&$ tasks , &$ next , $ deferred, & $ pending ) {
187208 if (0 === count ($ tasks )) {
188209 $ deferred ->resolve ($ value );
189210 return ;
@@ -192,6 +213,7 @@ function waterfall(array $tasks)
192213 $ task = array_shift ($ tasks );
193214 $ promise = call_user_func_array ($ task , func_get_args ());
194215 assert ($ promise instanceof PromiseInterface);
216+ $ pending = $ promise ;
195217
196218 $ promise ->then ($ next , array ($ deferred , 'reject ' ));
197219 };
0 commit comments