Skip to content

Commit 106e8ab

Browse files
committed
refactor(execution): track pending work
1 parent 229c9df commit 106e8ab

14 files changed

Lines changed: 506 additions & 121 deletions

src/execution/AsyncWorkTracker.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { isPromise } from '../jsutils/isPromise.js';
2+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
3+
4+
/** @internal */
5+
export class AsyncWorkTracker {
6+
pendingAsyncWork: Set<Promise<void>>;
7+
8+
constructor() {
9+
this.pendingAsyncWork = new Set<Promise<void>>();
10+
}
11+
12+
add(promise: Promise<unknown>): void {
13+
const pendingAsyncWork = this.pendingAsyncWork;
14+
const promiseToSettle = promise.then(
15+
() => {
16+
pendingAsyncWork.delete(promiseToSettle);
17+
},
18+
() => {
19+
pendingAsyncWork.delete(promiseToSettle);
20+
},
21+
);
22+
pendingAsyncWork.add(promiseToSettle);
23+
}
24+
25+
addValues(values: ReadonlyArray<PromiseOrValue<unknown>>): void {
26+
for (const value of values) {
27+
if (isPromise(value)) {
28+
this.add(value);
29+
}
30+
}
31+
}
32+
33+
async wait(): Promise<void> {
34+
await Promise.resolve();
35+
return waitForPendingSet(this.pendingAsyncWork);
36+
}
37+
38+
promiseAllTrackOnReject<T>(
39+
values: ReadonlyArray<PromiseOrValue<T>>,
40+
): Promise<Array<T>> {
41+
const promise = Promise.all(values);
42+
promise.then(undefined, () => {
43+
this.addValues(values);
44+
});
45+
return promise;
46+
}
47+
48+
promiseTrackPending<T, TResult>(
49+
values: ReadonlyArray<PromiseOrValue<T>>,
50+
combinator: (promises: ReadonlyArray<Promise<T>>) => Promise<TResult>,
51+
): Promise<TResult> {
52+
const promises = values.map((value) => Promise.resolve(value));
53+
const settled = promises.map(() => false);
54+
55+
for (let index = 0; index < promises.length; index++) {
56+
const promise = promises[index];
57+
promise.then(
58+
() => {
59+
settled[index] = true;
60+
},
61+
() => {
62+
settled[index] = true;
63+
},
64+
);
65+
}
66+
67+
const trackPending = () => {
68+
for (let index = 0; index < promises.length; index++) {
69+
if (!settled[index]) {
70+
this.add(promises[index]);
71+
}
72+
}
73+
};
74+
75+
const promise = combinator(promises);
76+
promise.then(trackPending, trackPending);
77+
return promise;
78+
}
79+
}
80+
81+
async function waitForPendingSet(
82+
pendingPromises: ReadonlySet<Promise<void>>,
83+
): Promise<void> {
84+
while (pendingPromises.size > 0) {
85+
// eslint-disable-next-line no-await-in-loop
86+
await Promise.allSettled(Array.from(pendingPromises));
87+
}
88+
}

src/execution/Executor.ts

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { addPath, pathToArray } from '../jsutils/Path.js';
1111
import { promiseForObject } from '../jsutils/promiseForObject.js';
1212
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
1313
import { promiseReduce } from '../jsutils/promiseReduce.js';
14+
import { toError } from '../jsutils/toError.js';
1415

1516
import { ensureGraphQLError } from '../error/ensureGraphQLError.js';
1617
import type { GraphQLFormattedError } from '../error/GraphQLError.js';
@@ -32,6 +33,7 @@ import type {
3233
GraphQLObjectType,
3334
GraphQLOutputType,
3435
GraphQLResolveInfo,
36+
GraphQLResolveInfoHelpers,
3537
GraphQLTypeResolver,
3638
} from '../type/definition.js';
3739
import {
@@ -231,6 +233,11 @@ export class Executor<
231233
abortResultPromise: ((reason?: unknown) => void) | undefined;
232234
resolverAbortController: AbortController | undefined;
233235
getAbortSignal: () => AbortSignal | undefined;
236+
getAsyncHelpers: () => GraphQLResolveInfoHelpers;
237+
trackPromise: (cleanup: Promise<unknown>) => void;
238+
promiseAll: <T>(
239+
values: ReadonlyArray<PromiseOrValue<T>>,
240+
) => Promise<Array<T>>;
234241

235242
constructor(
236243
validatedExecutionArgs: ValidatedExecutionArgs,
@@ -249,8 +256,12 @@ export class Executor<
249256
} else {
250257
this.sharedExecutionContext = sharedExecutionContext;
251258
}
252-
const { getAbortSignal } = this.sharedExecutionContext;
259+
const { getAbortSignal, getAsyncHelpers, getRegisterCleanup, promiseAll } =
260+
this.sharedExecutionContext;
253261
this.getAbortSignal = getAbortSignal;
262+
this.getAsyncHelpers = getAsyncHelpers;
263+
this.trackPromise = getRegisterCleanup();
264+
this.promiseAll = promiseAll;
254265
}
255266

256267
executeQueryOrMutationOrSubscriptionEvent(): PromiseOrValue<
@@ -261,10 +272,7 @@ export class Executor<
261272
if (externalAbortSignal) {
262273
externalAbortSignal.throwIfAborted();
263274
const onExternalAbort = () => {
264-
const aborted = this.abort(externalAbortSignal.reason);
265-
if (isPromise(aborted)) {
266-
aborted.catch(() => undefined);
267-
}
275+
this.abort(externalAbortSignal.reason);
268276
};
269277
removeExternalAbortListener = () =>
270278
externalAbortSignal.removeEventListener('abort', onExternalAbort);
@@ -326,6 +334,7 @@ export class Executor<
326334
return this.buildResponse(null);
327335
},
328336
);
337+
this.sharedExecutionContext.asyncWorkTracker.add(promise);
329338
const { promise: cancellablePromise, abort: abortResultPromise } =
330339
withCancellation(promise);
331340
this.abortResultPromise = abortResultPromise;
@@ -349,7 +358,7 @@ export class Executor<
349358
}
350359
}
351360

352-
abort(reason?: unknown): PromiseOrValue<void> {
361+
abort(reason?: unknown): void {
353362
if (this.aborted) {
354363
return;
355364
}
@@ -508,10 +517,12 @@ export class Executor<
508517
}
509518
} catch (error) {
510519
if (containsPromise) {
511-
// Ensure that any promises returned by other fields are handled, as they may also reject.
512-
return promiseForObject(results).finally(() => {
513-
throw error;
514-
}) as never;
520+
this.sharedExecutionContext.asyncWorkTracker.addValues(
521+
Object.values(results),
522+
);
523+
if (this.aborted) {
524+
return Promise.reject(toError(error));
525+
}
515526
}
516527
throw error;
517528
}
@@ -524,7 +535,7 @@ export class Executor<
524535
// Otherwise, results is a map from field name to the result of resolving that
525536
// field, which is possibly a promise. Return a promise that will return this
526537
// same map, but with any promises replaced with the values they resolved to.
527-
return promiseForObject(results);
538+
return promiseForObject(results, this.promiseAll);
528539
}
529540

530541
/**
@@ -561,6 +572,7 @@ export class Executor<
561572
parentType,
562573
path,
563574
this.getAbortSignal,
575+
this.getAsyncHelpers,
564576
);
565577

566578
// Get the resolve function, regardless of if its result is normal or abrupt (error).
@@ -857,26 +869,26 @@ export class Executor<
857869
index++;
858870
}
859871
} catch (error) {
860-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
861-
returnIteratorCatchingErrors(asyncIterator);
872+
this.trackPromise(returnIteratorCatchingErrors(asyncIterator));
862873
if (containsPromise) {
863-
return Promise.all(completedResults).finally(() => {
864-
throw error;
865-
});
874+
this.sharedExecutionContext.asyncWorkTracker.addValues(
875+
completedResults,
876+
);
866877
}
867878
throw error;
868879
}
869880

870881
// Throwing on completion outside of the loop may allow engines to better optimize
871882
if (this.aborted) {
872883
if (!iteration?.done) {
873-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
874-
returnIteratorCatchingErrors(asyncIterator);
884+
this.trackPromise(returnIteratorCatchingErrors(asyncIterator));
875885
}
876886
throw new Error('Aborted!');
877887
}
878888

879-
return containsPromise ? Promise.all(completedResults) : completedResults;
889+
return containsPromise
890+
? this.promiseAll(completedResults)
891+
: completedResults;
880892
}
881893

882894
/* c8 ignore next 12 */
@@ -997,17 +1009,17 @@ export class Executor<
9971009
index++;
9981010
}
9991011
} catch (error) {
1000-
const maybePromises = containsPromise ? completedResults : [];
1001-
maybePromises.push(...collectIteratorPromises(iterator));
1002-
if (maybePromises.length) {
1003-
return Promise.all(maybePromises).finally(() => {
1004-
throw error;
1005-
});
1012+
const asyncWorkTracker = this.sharedExecutionContext.asyncWorkTracker;
1013+
if (containsPromise) {
1014+
asyncWorkTracker.addValues(completedResults);
10061015
}
1016+
asyncWorkTracker.addValues(collectIteratorPromises(iterator));
10071017
throw error;
10081018
}
10091019

1010-
return containsPromise ? Promise.all(completedResults) : completedResults;
1020+
return containsPromise
1021+
? this.promiseAll(completedResults)
1022+
: completedResults;
10111023
}
10121024

10131025
completeMaybePromisedListItemValue(

0 commit comments

Comments
 (0)