@@ -297,19 +297,10 @@ function parallel(iterable $tasks): PromiseInterface
297297 $ pending = [];
298298 });
299299 $ results = [];
300- $ errored = false ;
300+ $ continue = true ;
301301
302- if (!\is_array ($ tasks )) {
303- $ tasks = \iterator_to_array ($ tasks );
304- }
305-
306- $ numTasks = count ($ tasks );
307- if (0 === $ numTasks ) {
308- $ deferred ->resolve ($ results );
309- }
310-
311- $ taskErrback = function ($ error ) use (&$ pending , $ deferred , &$ errored ) {
312- $ errored = true ;
302+ $ taskErrback = function ($ error ) use (&$ pending , $ deferred , &$ continue ) {
303+ $ continue = false ;
313304 $ deferred ->reject ($ error );
314305
315306 foreach ($ pending as $ promise ) {
@@ -321,25 +312,31 @@ function parallel(iterable $tasks): PromiseInterface
321312 };
322313
323314 foreach ($ tasks as $ i => $ task ) {
324- $ taskCallback = function ($ result ) use (&$ results , &$ pending , $ numTasks , $ i , $ deferred ) {
315+ $ taskCallback = function ($ result ) use (&$ results , &$ pending , & $ continue , $ i , $ deferred ) {
325316 $ results [$ i ] = $ result ;
317+ unset($ pending [$ i ]);
326318
327- if (count ( $ results ) === $ numTasks ) {
319+ if (! $ pending && ! $ continue ) {
328320 $ deferred ->resolve ($ results );
329321 }
330322 };
331323
332- $ promise = call_user_func ($ task );
324+ $ promise = \ call_user_func ($ task );
333325 assert ($ promise instanceof PromiseInterface);
334326 $ pending [$ i ] = $ promise ;
335327
336328 $ promise ->then ($ taskCallback , $ taskErrback );
337329
338- if ($ errored ) {
330+ if (! $ continue ) {
339331 break ;
340332 }
341333 }
342334
335+ $ continue = false ;
336+ if (!$ pending ) {
337+ $ deferred ->resolve ($ results );
338+ }
339+
343340 return $ deferred ->promise ();
344341}
345342
@@ -358,8 +355,9 @@ function series(iterable $tasks): PromiseInterface
358355 });
359356 $ results = [];
360357
361- if (!\is_array ($ tasks )) {
362- $ tasks = \iterator_to_array ($ tasks );
358+ if ($ tasks instanceof \IteratorAggregate) {
359+ $ tasks = $ tasks ->getIterator ();
360+ assert ($ tasks instanceof \Iterator);
363361 }
364362
365363 /** @var callable():void $next */
@@ -369,13 +367,19 @@ function series(iterable $tasks): PromiseInterface
369367 };
370368
371369 $ next = function () use (&$ tasks , $ taskCallback , $ deferred , &$ results , &$ pending ) {
372- if (0 === count ( $ tasks) ) {
370+ if ($ tasks instanceof \Iterator ? ! $ tasks-> valid () : ! $ tasks ) {
373371 $ deferred ->resolve ($ results );
374372 return ;
375373 }
376374
377- $ task = array_shift ($ tasks );
378- $ promise = call_user_func ($ task );
375+ if ($ tasks instanceof \Iterator) {
376+ $ task = $ tasks ->current ();
377+ $ tasks ->next ();
378+ } else {
379+ $ task = \array_shift ($ tasks );
380+ }
381+
382+ $ promise = \call_user_func ($ task );
379383 assert ($ promise instanceof PromiseInterface);
380384 $ pending = $ promise ;
381385
@@ -401,19 +405,26 @@ function waterfall(iterable $tasks): PromiseInterface
401405 $ pending = null ;
402406 });
403407
404- if (!\is_array ($ tasks )) {
405- $ tasks = \iterator_to_array ($ tasks );
408+ if ($ tasks instanceof \IteratorAggregate) {
409+ $ tasks = $ tasks ->getIterator ();
410+ assert ($ tasks instanceof \Iterator);
406411 }
407412
408413 /** @var callable $next */
409414 $ next = function ($ value = null ) use (&$ tasks , &$ next , $ deferred , &$ pending ) {
410- if (0 === count ( $ tasks) ) {
415+ if ($ tasks instanceof \Iterator ? ! $ tasks-> valid () : ! $ tasks ) {
411416 $ deferred ->resolve ($ value );
412417 return ;
413418 }
414419
415- $ task = array_shift ($ tasks );
416- $ promise = call_user_func_array ($ task , func_get_args ());
420+ if ($ tasks instanceof \Iterator) {
421+ $ task = $ tasks ->current ();
422+ $ tasks ->next ();
423+ } else {
424+ $ task = \array_shift ($ tasks );
425+ }
426+
427+ $ promise = \call_user_func_array ($ task , func_get_args ());
417428 assert ($ promise instanceof PromiseInterface);
418429 $ pending = $ promise ;
419430
0 commit comments