Skip to content

Commit 428d91e

Browse files
committed
fix(worker): cleanup timeout abort listener and extract abort race helper
1 parent c226770 commit 428d91e

2 files changed

Lines changed: 102 additions & 4 deletions

File tree

src/worker.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -449,13 +449,43 @@ export class Worker {
449449
const signal = AbortSignal.timeout(timeout)
450450
instance.$hydrate(payload, context, signal)
451451

452+
const { abortPromise, cleanupAbortListener } = this.#createTimeoutAbortRace(
453+
signal,
454+
instance.constructor.name,
455+
timeout
456+
)
457+
458+
try {
459+
await Promise.race([instance.execute(), abortPromise])
460+
} finally {
461+
cleanupAbortListener()
462+
}
463+
}
464+
465+
#createTimeoutAbortRace(signal: AbortSignal, jobName: string, timeout: number) {
466+
let abortHandler: (() => void) | undefined
467+
452468
const abortPromise = new Promise<never>((_, reject) => {
453-
signal.addEventListener('abort', () => {
454-
reject(new errors.E_JOB_TIMEOUT([instance.constructor.name, timeout]))
455-
})
469+
abortHandler = () => {
470+
reject(new errors.E_JOB_TIMEOUT([jobName, timeout]))
471+
}
472+
473+
if (signal.aborted) {
474+
abortHandler()
475+
return
476+
}
477+
478+
signal.addEventListener('abort', abortHandler, { once: true })
456479
})
457480

458-
await Promise.race([instance.execute(), abortPromise])
481+
return {
482+
abortPromise,
483+
cleanupAbortListener: () => {
484+
if (abortHandler) {
485+
signal.removeEventListener('abort', abortHandler)
486+
}
487+
},
488+
}
459489
}
460490

461491
async #acquireNextJob(queues: string[]): Promise<{ job: AcquiredJob; queue: string } | null> {

tests/worker.spec.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,74 @@ test.group('Worker', () => {
461461
assert.isBelow(elapsed, 150, 'Job should be killed before completing')
462462
})
463463

464+
test('should remove timeout abort listener when job completes before timeout', async ({
465+
assert,
466+
cleanup,
467+
}) => {
468+
class FastJob extends Job {
469+
static options = { timeout: 5_000 }
470+
471+
async execute() {}
472+
}
473+
474+
const sharedAdapter = memory()()
475+
476+
const localConfig = {
477+
default: 'memory',
478+
adapters: { memory: () => sharedAdapter },
479+
}
480+
481+
const controller = new AbortController()
482+
const originalTimeout = AbortSignal.timeout
483+
const originalAddEventListener = controller.signal.addEventListener.bind(controller.signal)
484+
const originalRemoveEventListener = controller.signal.removeEventListener.bind(controller.signal)
485+
486+
let addedAbortListeners = 0
487+
let removedAbortListeners = 0
488+
489+
controller.signal.addEventListener = ((type, listener, options) => {
490+
if (type === 'abort') {
491+
addedAbortListeners++
492+
}
493+
494+
return originalAddEventListener(type, listener, options)
495+
}) as AbortSignal['addEventListener']
496+
497+
controller.signal.removeEventListener = ((type, listener, options) => {
498+
if (type === 'abort') {
499+
removedAbortListeners++
500+
}
501+
502+
return originalRemoveEventListener(type, listener, options)
503+
}) as AbortSignal['removeEventListener']
504+
505+
AbortSignal.timeout = (() => controller.signal) as typeof AbortSignal.timeout
506+
507+
Locator.register('FastJob', FastJob)
508+
509+
const worker = new Worker(localConfig)
510+
511+
cleanup(async () => {
512+
AbortSignal.timeout = originalTimeout
513+
Locator.clear()
514+
await worker.stop()
515+
})
516+
517+
await sharedAdapter.push({
518+
id: 'cleanup-timeout-listener-job',
519+
name: 'FastJob',
520+
payload: {},
521+
attempts: 0,
522+
priority: 0,
523+
})
524+
525+
await worker.processCycle(['default']) // started
526+
await worker.processCycle(['default']) // completed
527+
528+
assert.equal(addedAbortListeners, 1)
529+
assert.equal(removedAbortListeners, 1)
530+
})
531+
464532
test('should retry timed out job when failOnTimeout is false', async ({ assert, cleanup }) => {
465533
let attempts = 0
466534

0 commit comments

Comments
 (0)