Skip to content

Commit 2f279a4

Browse files
committed
fix(sync): handle delayed execution errors
1 parent ab8f2c1 commit 2f279a4

4 files changed

Lines changed: 87 additions & 1 deletion

File tree

src/drivers/sync_adapter.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ export class SyncAdapter implements Adapter {
4141

4242
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
4343
setTimeout(() => {
44-
void this.#execute(jobData, queue)
44+
void this.#execute(jobData, queue).catch((error) => {
45+
QueueManager.getLogger().error(
46+
{ err: error, jobId: jobData.id, jobName: jobData.name, queue },
47+
'Failed to execute delayed sync job'
48+
)
49+
})
4550
}, delay)
4651

4752
return Promise.resolve()

src/queue_manager.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,13 @@ class QueueManagerSingleton {
252252
return this.#jobFactory
253253
}
254254

255+
/**
256+
* Get the configured logger used by the queue runtime.
257+
*/
258+
getLogger(): Logger {
259+
return this.#logger
260+
}
261+
255262
/**
256263
* Get the resolver responsible for effective queue/job runtime config.
257264
*/

tests/queue_manager.spec.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,18 @@ test.group('QueueManager', () => {
8181
assert.exists(resolver)
8282
})
8383

84+
test('should expose the configured logger', async ({ assert }) => {
85+
const logger = new MemoryLogger()
86+
87+
await QueueManager.init({
88+
default: 'sync',
89+
adapters: { sync: sync() },
90+
logger,
91+
})
92+
93+
assert.strictEqual(QueueManager.getLogger(), logger)
94+
})
95+
8496
test('should throw E_QUEUE_NOT_INITIALIZED when use() called before init()', async ({
8597
assert,
8698
}) => {

tests/sync_adapter.spec.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import { setTimeout as sleep } from 'node:timers/promises'
12
import { test } from '@japa/runner'
23
import { Job } from '../src/job.js'
34
import { Locator } from '../src/locator.js'
45
import { QueueManager } from '../src/queue_manager.js'
56
import { sync } from '../src/drivers/sync_adapter.js'
67
import * as errors from '../src/exceptions.js'
8+
import { MemoryLogger } from './_mocks/memory_logger.js'
79

810
test.group('SyncAdapter', (group) => {
911
group.each.teardown(async () => {
@@ -55,4 +57,64 @@ test.group('SyncAdapter', (group) => {
5557
assert.instanceOf(failedError, errors.E_JOB_MAX_ATTEMPTS_REACHED)
5658
assert.deepEqual(contextJobIds, Array(contextJobIds.length).fill(jobId))
5759
})
60+
61+
test('should log delayed sync job failures without unhandled rejections', async ({
62+
assert,
63+
}) => {
64+
const logger = new MemoryLogger()
65+
let unhandledError: unknown
66+
const onUnhandledRejection = (error: unknown) => {
67+
unhandledError = error
68+
}
69+
70+
class DelayedFailingSyncJob extends Job<Record<string, never>> {
71+
async execute() {
72+
throw new Error('boom')
73+
}
74+
75+
async failed() {
76+
throw new Error('failed hook exploded')
77+
}
78+
}
79+
80+
process.once('unhandledRejection', onUnhandledRejection)
81+
82+
try {
83+
await QueueManager.init({
84+
default: 'sync',
85+
adapters: { sync: sync() },
86+
logger,
87+
})
88+
89+
Locator.register('DelayedFailingSyncJob', DelayedFailingSyncJob)
90+
91+
const adapter = QueueManager.use()
92+
93+
await adapter.pushLaterOn(
94+
'default',
95+
{
96+
id: 'delayed-sync-job',
97+
name: 'DelayedFailingSyncJob',
98+
payload: {},
99+
attempts: 0,
100+
priority: 0,
101+
},
102+
0
103+
)
104+
105+
await sleep(20)
106+
} finally {
107+
process.removeListener('unhandledRejection', onUnhandledRejection)
108+
}
109+
110+
assert.isUndefined(unhandledError)
111+
assert.lengthOf(logger.logs, 1)
112+
assert.equal(logger.logs[0].level, 'error')
113+
assert.equal(logger.logs[0].message, 'Failed to execute delayed sync job')
114+
assert.equal(logger.logs[0].obj?.jobId, 'delayed-sync-job')
115+
assert.equal(logger.logs[0].obj?.jobName, 'DelayedFailingSyncJob')
116+
assert.equal(logger.logs[0].obj?.queue, 'default')
117+
assert.instanceOf(logger.logs[0].obj?.err, Error)
118+
assert.equal((logger.logs[0].obj?.err as Error).message, 'failed hook exploded')
119+
})
58120
})

0 commit comments

Comments
 (0)