Skip to content

Commit 0708d2a

Browse files
committed
feat(execution): track pending async work
access finalization of all async work beyond that included in the result via experimental hook Access the asyncWorkFinished hook via `experimentalHooks` on execution args: ```ts const result = execute({ schema, document: parse('{ test }'), experimentalHooks: { queryOrMutationOrSubscriptionEventAsyncWorkFinished({ validatedExecutionArgs, }) { const operationName = validatedExecutionArgs.operationName ?? '<anonymous>'; console.log(`async work finished for operation: ${operationName}`); }, }, }); ``` `execute(...)` can return synchronously (for example after a synchronous bubbled error) while tracked async work is still pending. To always get a Promise that resolves only after this hook is called: ```ts const args: ExecutionArgs = { schema, document: parse('{ test }'), }; let hookCalled = false; let resolveAsyncWorkFinished: (() => void) | undefined; const asyncWorkFinished = new Promise<void>((resolve) => { resolveAsyncWorkFinished = resolve; }); const existingHook = args.experimentalHooks ?.queryOrMutationOrSubscriptionEventAsyncWorkFinished; const result = execute({ ...args, experimentalHooks: { ...(args.experimentalHooks ?? {}), queryOrMutationOrSubscriptionEventAsyncWorkFinished(info) { try { existingHook?.(info); } finally { hookCalled = true; resolveAsyncWorkFinished?.(); } }, }, }); const resultAfterAsyncWorkFinished: Promise<ExecutionResult> = Promise.resolve(result).then((executionResult) => hookCalled ? executionResult : asyncWorkFinished.then(() => executionResult), ); ``` This is safe whether `result` is sync or async, and whether the hook callback is reached immediately or later. Async work started inside resolvers is not automatically guaranteed to be tracked. For example, `Promise.all([...])` rejects on the first rejection, so sibling promises may still be pending after `execute(...)` has already produced a result. Use `info.getAsyncHelpers()` to include that work in async tracking: ```ts resolve(_source, _args, _context, info) { const { promiseAll } = info.getAsyncHelpers(); return promiseAll([ fetchFromA(), fetchFromB(), fetchFromC(), ]); } ``` `promiseAny` and `promiseRace` similarly track non-winning/non-settled promises, and `trackPromise` can be used for any additional Promise you want included: ```ts resolve(_source, _args, _context, info) { const { trackPromise } = info.getAsyncHelpers(); trackPromise(cleanupAfterResponse()); return 'ok'; } ```
1 parent d03118a commit 0708d2a

14 files changed

Lines changed: 819 additions & 87 deletions

src/execution/AsyncWorkTracker.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { isPromise } from '../jsutils/isPromise.js';
2+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
3+
4+
/** @internal */
5+
export class AsyncWorkTracker {
6+
pendingAsyncWork: Set<Promise<void>>;
7+
8+
constructor() {
9+
this.pendingAsyncWork = new Set<Promise<void>>();
10+
}
11+
12+
add(promise: Promise<unknown>): void {
13+
const pendingAsyncWork = this.pendingAsyncWork;
14+
const promiseToSettle = promise.then(
15+
() => {
16+
pendingAsyncWork.delete(promiseToSettle);
17+
},
18+
() => {
19+
pendingAsyncWork.delete(promiseToSettle);
20+
},
21+
);
22+
pendingAsyncWork.add(promiseToSettle);
23+
}
24+
25+
addValues(values: ReadonlyArray<PromiseOrValue<unknown>>): void {
26+
for (const value of values) {
27+
if (isPromise(value)) {
28+
this.add(value);
29+
}
30+
}
31+
}
32+
33+
async wait(): Promise<void> {
34+
await Promise.resolve();
35+
while (this.pendingAsyncWork.size > 0) {
36+
// eslint-disable-next-line no-await-in-loop
37+
await Promise.allSettled(Array.from(this.pendingAsyncWork));
38+
}
39+
}
40+
41+
promiseAllTrackOnReject<T>(
42+
values: ReadonlyArray<PromiseOrValue<T>>,
43+
): Promise<Array<T>> {
44+
const promise = Promise.all(values);
45+
promise.then(undefined, () => {
46+
this.addValues(values);
47+
});
48+
return promise;
49+
}
50+
51+
promiseCombinatorWithTracking<T, TResult>(
52+
values: ReadonlyArray<PromiseOrValue<T>>,
53+
combinator: (
54+
promises: ReadonlyArray<PromiseOrValue<T>>,
55+
) => Promise<TResult>,
56+
): Promise<TResult> {
57+
this.add(Promise.allSettled(values));
58+
return combinator(values);
59+
}
60+
}

src/execution/Executor.ts

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import type {
3232
GraphQLObjectType,
3333
GraphQLOutputType,
3434
GraphQLResolveInfo,
35+
GraphQLResolveInfoHelpers,
3536
GraphQLTypeResolver,
3637
} from '../type/definition.js';
3738
import {
@@ -60,6 +61,8 @@ import { createSharedExecutionContext } from './createSharedExecutionContext.js'
6061
import { buildResolveInfo } from './execute.js';
6162
import type { StreamUsage } from './getStreamUsage.js';
6263
import { getStreamUsage as _getStreamUsage } from './getStreamUsage.js';
64+
import type { ExecutionHooks } from './hooks.js';
65+
import { runPostExecutionHooks } from './hooks.js';
6366
import { returnIteratorCatchingErrors } from './returnIteratorCatchingErrors.js';
6467
import type { VariableValues } from './values.js';
6568
import { getArgumentValues } from './values.js';
@@ -115,6 +118,7 @@ export interface ValidatedExecutionArgs {
115118
errorPropagation: boolean;
116119
externalAbortSignal: AbortSignal | undefined;
117120
enableEarlyExecution: boolean;
121+
experimentalHooks: ExecutionHooks | undefined;
118122
}
119123

120124
/**
@@ -231,6 +235,11 @@ export class Executor<
231235
abortResultPromise: ((reason?: unknown) => void) | undefined;
232236
resolverAbortController: AbortController | undefined;
233237
getAbortSignal: () => AbortSignal | undefined;
238+
getAsyncHelpers: () => GraphQLResolveInfoHelpers;
239+
trackPromise: (cleanup: Promise<unknown>) => void;
240+
promiseAll: <T>(
241+
values: ReadonlyArray<PromiseOrValue<T>>,
242+
) => Promise<Array<T>>;
234243

235244
constructor(
236245
validatedExecutionArgs: ValidatedExecutionArgs,
@@ -249,8 +258,12 @@ export class Executor<
249258
} else {
250259
this.sharedExecutionContext = sharedExecutionContext;
251260
}
252-
const { getAbortSignal } = this.sharedExecutionContext;
261+
const { getAbortSignal, getAsyncHelpers, getTrackPromise, promiseAll } =
262+
this.sharedExecutionContext;
253263
this.getAbortSignal = getAbortSignal;
264+
this.getAsyncHelpers = getAsyncHelpers;
265+
this.trackPromise = getTrackPromise();
266+
this.promiseAll = promiseAll;
254267
}
255268

256269
executeQueryOrMutationOrSubscriptionEvent(): PromiseOrValue<
@@ -261,10 +274,7 @@ export class Executor<
261274
if (externalAbortSignal) {
262275
externalAbortSignal.throwIfAborted();
263276
const onExternalAbort = () => {
264-
const aborted = this.abort(externalAbortSignal.reason);
265-
if (isPromise(aborted)) {
266-
aborted.catch(() => undefined);
267-
}
277+
this.abort(externalAbortSignal.reason);
268278
};
269279
removeExternalAbortListener = () =>
270280
externalAbortSignal.removeEventListener('abort', onExternalAbort);
@@ -326,6 +336,7 @@ export class Executor<
326336
return this.buildResponse(null);
327337
},
328338
);
339+
this.sharedExecutionContext.asyncWorkTracker.add(promise);
329340
const { promise: cancellablePromise, abort: abortResultPromise } =
330341
withCancellation(promise);
331342
this.abortResultPromise = abortResultPromise;
@@ -349,7 +360,7 @@ export class Executor<
349360
}
350361
}
351362

352-
abort(reason?: unknown): PromiseOrValue<void> {
363+
abort(reason?: unknown): void {
353364
if (this.aborted) {
354365
return;
355366
}
@@ -359,9 +370,23 @@ export class Executor<
359370
}
360371
this.abortResultPromise?.(this.abortReason);
361372
this.resolverAbortController?.abort(this.abortReason);
373+
const hooks = this.validatedExecutionArgs.experimentalHooks;
374+
if (hooks !== undefined) {
375+
runPostExecutionHooks(
376+
this.validatedExecutionArgs,
377+
this.sharedExecutionContext,
378+
);
379+
}
362380
}
363381

364382
finish(): void {
383+
const hooks = this.validatedExecutionArgs.experimentalHooks;
384+
if (hooks !== undefined) {
385+
runPostExecutionHooks(
386+
this.validatedExecutionArgs,
387+
this.sharedExecutionContext,
388+
);
389+
}
365390
this.throwIfAborted();
366391
this.aborted = true;
367392
}
@@ -508,8 +533,9 @@ export class Executor<
508533
}
509534
} catch (error) {
510535
if (containsPromise) {
511-
// Ensure that any promises returned by other fields are handled, as they may also reject.
512-
promiseForObject(results).catch(() => undefined);
536+
this.sharedExecutionContext.asyncWorkTracker.addValues(
537+
Object.values(results),
538+
);
513539
}
514540
throw error;
515541
}
@@ -522,7 +548,7 @@ export class Executor<
522548
// Otherwise, results is a map from field name to the result of resolving that
523549
// field, which is possibly a promise. Return a promise that will return this
524550
// same map, but with any promises replaced with the values they resolved to.
525-
return promiseForObject(results);
551+
return promiseForObject(results, this.promiseAll);
526552
}
527553

528554
/**
@@ -559,6 +585,7 @@ export class Executor<
559585
parentType,
560586
path,
561587
this.getAbortSignal,
588+
this.getAsyncHelpers,
562589
);
563590

564591
// Get the resolve function, regardless of if its result is normal or abrupt (error).
@@ -855,24 +882,26 @@ export class Executor<
855882
index++;
856883
}
857884
} catch (error) {
858-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
859-
returnIteratorCatchingErrors(asyncIterator);
885+
this.trackPromise(returnIteratorCatchingErrors(asyncIterator));
860886
if (containsPromise) {
861-
Promise.all(completedResults).catch(() => undefined);
887+
this.sharedExecutionContext.asyncWorkTracker.addValues(
888+
completedResults,
889+
);
862890
}
863891
throw error;
864892
}
865893

866894
// Throwing on completion outside of the loop may allow engines to better optimize
867895
if (this.aborted) {
868896
if (!iteration?.done) {
869-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
870-
returnIteratorCatchingErrors(asyncIterator);
897+
this.trackPromise(returnIteratorCatchingErrors(asyncIterator));
871898
}
872899
throw new Error('Aborted!');
873900
}
874901

875-
return containsPromise ? Promise.all(completedResults) : completedResults;
902+
return containsPromise
903+
? this.promiseAll(completedResults)
904+
: completedResults;
876905
}
877906

878907
/* c8 ignore next 12 */
@@ -993,15 +1022,17 @@ export class Executor<
9931022
index++;
9941023
}
9951024
} catch (error) {
996-
const maybePromises = containsPromise ? completedResults : [];
997-
maybePromises.push(...collectIteratorPromises(iterator));
998-
if (maybePromises.length) {
999-
Promise.all(maybePromises).catch(() => undefined);
1025+
const asyncWorkTracker = this.sharedExecutionContext.asyncWorkTracker;
1026+
if (containsPromise) {
1027+
asyncWorkTracker.addValues(completedResults);
10001028
}
1029+
asyncWorkTracker.addValues(collectIteratorPromises(iterator));
10011030
throw error;
10021031
}
10031032

1004-
return containsPromise ? Promise.all(completedResults) : completedResults;
1033+
return containsPromise
1034+
? this.promiseAll(completedResults)
1035+
: completedResults;
10051036
}
10061037

10071038
completeMaybePromisedListItemValue(

0 commit comments

Comments
 (0)