@@ -548,19 +548,10 @@ function parallel(iterable $tasks): PromiseInterface
548548 $ pending = [];
549549 });
550550 $ results = [];
551- $ errored = false ;
551+ $ continue = true ;
552552
553- if (!\is_array ($ tasks )) {
554- $ tasks = \iterator_to_array ($ tasks );
555- }
556-
557- $ numTasks = count ($ tasks );
558- if (0 === $ numTasks ) {
559- $ deferred ->resolve ($ results );
560- }
561-
562- $ taskErrback = function ($ error ) use (&$ pending , $ deferred , &$ errored ) {
563- $ errored = true ;
553+ $ taskErrback = function ($ error ) use (&$ pending , $ deferred , &$ continue ) {
554+ $ continue = false ;
564555 $ deferred ->reject ($ error );
565556
566557 foreach ($ pending as $ promise ) {
@@ -572,25 +563,31 @@ function parallel(iterable $tasks): PromiseInterface
572563 };
573564
574565 foreach ($ tasks as $ i => $ task ) {
575- $ taskCallback = function ($ result ) use (&$ results , &$ pending , $ numTasks , $ i , $ deferred ) {
566+ $ taskCallback = function ($ result ) use (&$ results , &$ pending , & $ continue , $ i , $ deferred ) {
576567 $ results [$ i ] = $ result ;
568+ unset($ pending [$ i ]);
577569
578- if (count ( $ results ) === $ numTasks ) {
570+ if (! $ pending && ! $ continue ) {
579571 $ deferred ->resolve ($ results );
580572 }
581573 };
582574
583- $ promise = call_user_func ($ task );
575+ $ promise = \ call_user_func ($ task );
584576 assert ($ promise instanceof PromiseInterface);
585577 $ pending [$ i ] = $ promise ;
586578
587579 $ promise ->then ($ taskCallback , $ taskErrback );
588580
589- if ($ errored ) {
581+ if (! $ continue ) {
590582 break ;
591583 }
592584 }
593585
586+ $ continue = false ;
587+ if (!$ pending ) {
588+ $ deferred ->resolve ($ results );
589+ }
590+
594591 return $ deferred ->promise ();
595592}
596593
@@ -609,8 +606,9 @@ function series(iterable $tasks): PromiseInterface
609606 });
610607 $ results = [];
611608
612- if (!\is_array ($ tasks )) {
613- $ tasks = \iterator_to_array ($ tasks );
609+ if ($ tasks instanceof \IteratorAggregate) {
610+ $ tasks = $ tasks ->getIterator ();
611+ assert ($ tasks instanceof \Iterator);
614612 }
615613
616614 /** @var callable():void $next */
@@ -620,13 +618,19 @@ function series(iterable $tasks): PromiseInterface
620618 };
621619
622620 $ next = function () use (&$ tasks , $ taskCallback , $ deferred , &$ results , &$ pending ) {
623- if (0 === count ( $ tasks) ) {
621+ if ($ tasks instanceof \Iterator ? ! $ tasks-> valid () : ! $ tasks ) {
624622 $ deferred ->resolve ($ results );
625623 return ;
626624 }
627625
628- $ task = array_shift ($ tasks );
629- $ promise = call_user_func ($ task );
626+ if ($ tasks instanceof \Iterator) {
627+ $ task = $ tasks ->current ();
628+ $ tasks ->next ();
629+ } else {
630+ $ task = \array_shift ($ tasks );
631+ }
632+
633+ $ promise = \call_user_func ($ task );
630634 assert ($ promise instanceof PromiseInterface);
631635 $ pending = $ promise ;
632636
@@ -652,19 +656,26 @@ function waterfall(iterable $tasks): PromiseInterface
652656 $ pending = null ;
653657 });
654658
655- if (!\is_array ($ tasks )) {
656- $ tasks = \iterator_to_array ($ tasks );
659+ if ($ tasks instanceof \IteratorAggregate) {
660+ $ tasks = $ tasks ->getIterator ();
661+ assert ($ tasks instanceof \Iterator);
657662 }
658663
659664 /** @var callable $next */
660665 $ next = function ($ value = null ) use (&$ tasks , &$ next , $ deferred , &$ pending ) {
661- if (0 === count ( $ tasks) ) {
666+ if ($ tasks instanceof \Iterator ? ! $ tasks-> valid () : ! $ tasks ) {
662667 $ deferred ->resolve ($ value );
663668 return ;
664669 }
665670
666- $ task = array_shift ($ tasks );
667- $ promise = call_user_func_array ($ task , func_get_args ());
671+ if ($ tasks instanceof \Iterator) {
672+ $ task = $ tasks ->current ();
673+ $ tasks ->next ();
674+ } else {
675+ $ task = \array_shift ($ tasks );
676+ }
677+
678+ $ promise = \call_user_func_array ($ task , func_get_args ());
668679 assert ($ promise instanceof PromiseInterface);
669680 $ pending = $ promise ;
670681
0 commit comments