Skip to content

Commit d5c250e

Browse files
committed
refactor!: extract queue config resolution from QueueManager
1 parent 1169dd2 commit d5c250e

5 files changed

Lines changed: 212 additions & 130 deletions

File tree

src/queue_config_resolver.ts

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import type {
2+
Duration,
3+
JobOptions,
4+
QueueConfig,
5+
QueueManagerConfig,
6+
RetryConfig,
7+
} from './types/main.js'
8+
9+
/**
10+
* Resolve effective queue/job runtime configuration from the initialized
11+
* queue config.
12+
*
13+
* This keeps merge rules in one place without coupling execution code to the
14+
* full `QueueManager` lifecycle and adapter concerns.
15+
*/
16+
export class QueueConfigResolver {
17+
readonly #globalRetryConfig?: RetryConfig
18+
readonly #globalJobOptions?: JobOptions
19+
readonly #queueConfigs: Map<string, QueueConfig>
20+
readonly #workerTimeout?: Duration
21+
22+
/**
23+
* Create a resolver from the queue manager config.
24+
*/
25+
static from(config: QueueManagerConfig): QueueConfigResolver {
26+
return new QueueConfigResolver({
27+
globalRetryConfig: config.retry,
28+
globalJobOptions: config.defaultJobOptions,
29+
queueConfigs: new Map(Object.entries(config.queues || {}) as [string, QueueConfig][]),
30+
workerTimeout: config.worker?.timeout,
31+
})
32+
}
33+
34+
/**
35+
* Create a resolver from already-materialized config fragments.
36+
*/
37+
constructor({
38+
globalRetryConfig,
39+
globalJobOptions,
40+
queueConfigs,
41+
workerTimeout,
42+
}: {
43+
globalRetryConfig?: RetryConfig
44+
globalJobOptions?: JobOptions
45+
queueConfigs?: Map<string, QueueConfig>
46+
workerTimeout?: Duration
47+
}) {
48+
this.#globalRetryConfig = globalRetryConfig
49+
this.#globalJobOptions = globalJobOptions
50+
this.#queueConfigs = queueConfigs ?? new Map()
51+
this.#workerTimeout = workerTimeout
52+
}
53+
54+
/**
55+
* Resolve the retry policy for a job using priority: job > queue > global.
56+
*/
57+
resolveRetryConfig(queue: string, jobRetryConfig?: RetryConfig): RetryConfig {
58+
const queueConfig = this.#queueConfigs.get(queue)
59+
const queueRetryConfig = queueConfig?.retry || {}
60+
61+
const maxRetries =
62+
jobRetryConfig?.maxRetries ??
63+
queueRetryConfig.maxRetries ??
64+
this.#globalRetryConfig?.maxRetries ??
65+
0
66+
67+
const backoff =
68+
jobRetryConfig?.backoff || queueRetryConfig.backoff || this.#globalRetryConfig?.backoff
69+
70+
return { maxRetries, backoff }
71+
}
72+
73+
/**
74+
* Resolve effective retention options using priority: job > queue > global.
75+
*/
76+
resolveJobOptions(queue: string, jobOptions?: JobOptions): JobOptions {
77+
const queueConfig = this.#queueConfigs.get(queue)
78+
const queueJobOptions = queueConfig?.defaultJobOptions
79+
80+
return {
81+
removeOnComplete:
82+
jobOptions?.removeOnComplete ??
83+
queueJobOptions?.removeOnComplete ??
84+
this.#globalJobOptions?.removeOnComplete,
85+
removeOnFail:
86+
jobOptions?.removeOnFail ??
87+
queueJobOptions?.removeOnFail ??
88+
this.#globalJobOptions?.removeOnFail,
89+
}
90+
}
91+
92+
/**
93+
* Return the configured default worker timeout.
94+
*/
95+
getWorkerTimeout(): Duration | undefined {
96+
return this.#workerTimeout
97+
}
98+
}

src/queue_manager.ts

Lines changed: 14 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,17 @@ import debug from './debug.js'
33
import { Locator } from './locator.js'
44
import { consoleLogger, type Logger } from './logger.js'
55
import { FakeAdapter } from './drivers/fake_adapter.js'
6+
import { QueueConfigResolver } from './queue_config_resolver.js'
67
import type { Adapter } from './contracts/adapter.js'
7-
import type {
8-
AdapterFactory,
9-
JobFactory,
10-
JobOptions,
11-
QueueConfig,
12-
QueueManagerConfig,
13-
RetryConfig,
14-
} from './types/main.js'
8+
import type { AdapterFactory, JobFactory, QueueManagerConfig } from './types/main.js'
159

1610
type QueueManagerFakeState = {
1711
defaultAdapter: string
1812
adapters: Record<string, AdapterFactory>
1913
adapterInstances: Map<string, Adapter>
20-
globalRetryConfig?: RetryConfig
21-
globalJobOptions?: JobOptions
22-
queueConfigs: Map<string, QueueConfig>
2314
logger: Logger
2415
jobFactory?: JobFactory
16+
configResolver: QueueConfigResolver
2517
fakeAdapter: FakeAdapter
2618
}
2719

@@ -61,11 +53,9 @@ class QueueManagerSingleton {
6153
#defaultAdapter!: string
6254
#adapters: Record<string, AdapterFactory> = {}
6355
#adapterInstances: Map<string, Adapter> = new Map()
64-
#globalRetryConfig?: RetryConfig
65-
#globalJobOptions?: JobOptions
66-
#queueConfigs: Map<string, QueueConfig> = new Map()
6756
#logger: Logger = consoleLogger
6857
#jobFactory?: JobFactory
58+
#configResolver: QueueConfigResolver = new QueueConfigResolver({})
6959
#fakeState?: QueueManagerFakeState
7060

7161
/**
@@ -101,16 +91,9 @@ class QueueManagerSingleton {
10191

10292
this.#defaultAdapter = config.default
10393
this.#adapters = config.adapters
104-
this.#globalRetryConfig = config.retry
105-
this.#globalJobOptions = config.defaultJobOptions
10694
this.#logger = config.logger ?? consoleLogger
10795
this.#jobFactory = config.jobFactory
108-
109-
if (config.queues) {
110-
for (const [queue, queueConfig] of Object.entries(config.queues)) {
111-
this.#queueConfigs.set(queue, queueConfig as QueueConfig)
112-
}
113-
}
96+
this.#configResolver = QueueConfigResolver.from(config)
11497

11598
if (config.locations && config.locations.length > 0) {
11699
const registered = await Locator.registerFromGlob(config.locations)
@@ -216,11 +199,9 @@ class QueueManagerSingleton {
216199
defaultAdapter: this.#defaultAdapter,
217200
adapters: this.#adapters,
218201
adapterInstances: this.#adapterInstances,
219-
globalRetryConfig: this.#globalRetryConfig,
220-
globalJobOptions: this.#globalJobOptions,
221-
queueConfigs: this.#queueConfigs,
222202
logger: this.#logger,
223203
jobFactory: this.#jobFactory,
204+
configResolver: this.#configResolver,
224205
fakeAdapter,
225206
}
226207

@@ -257,44 +238,9 @@ class QueueManagerSingleton {
257238
this.#defaultAdapter = state.defaultAdapter
258239
this.#adapters = state.adapters
259240
this.#adapterInstances = state.adapterInstances
260-
this.#globalRetryConfig = state.globalRetryConfig
261-
this.#globalJobOptions = state.globalJobOptions
262-
this.#queueConfigs = state.queueConfigs
263241
this.#logger = state.logger
264242
this.#jobFactory = state.jobFactory
265-
}
266-
267-
/**
268-
* Get the merged retry configuration for a job.
269-
*
270-
* Configuration is merged with priority: job > queue > global.
271-
* This allows specific jobs or queues to override global defaults.
272-
*
273-
* @param queue - The queue name
274-
* @param jobRetryConfig - Optional job-level retry config
275-
* @returns The merged retry configuration
276-
*
277-
* @example
278-
* ```typescript
279-
* // Global: maxRetries=3, Queue: maxRetries=5, Job: maxRetries=1
280-
* // Result: maxRetries=1 (job wins)
281-
* const config = QueueManager.getMergedRetryConfig('emails', { maxRetries: 1 })
282-
* ```
283-
*/
284-
getMergedRetryConfig(queue: string, jobRetryConfig?: RetryConfig): RetryConfig {
285-
const queueConfig = this.#queueConfigs.get(queue)
286-
const queueRetryConfig = queueConfig?.retry || {}
287-
288-
let maxRetries =
289-
jobRetryConfig?.maxRetries ??
290-
queueRetryConfig.maxRetries ??
291-
this.#globalRetryConfig?.maxRetries ??
292-
0
293-
294-
let backoff =
295-
jobRetryConfig?.backoff || queueRetryConfig.backoff || this.#globalRetryConfig?.backoff
296-
297-
return { maxRetries, backoff }
243+
this.#configResolver = state.configResolver
298244
}
299245

300246
/**
@@ -307,22 +253,14 @@ class QueueManagerSingleton {
307253
}
308254

309255
/**
310-
* Get the merged job options for a job (priority: job > queue > global).
256+
* Get the resolver responsible for effective queue/job runtime config.
311257
*/
312-
getMergedJobOptions(queue: string, jobOptions?: JobOptions): JobOptions {
313-
const queueConfig = this.#queueConfigs.get(queue)
314-
const queueJobOptions = queueConfig?.defaultJobOptions
315-
316-
return {
317-
removeOnComplete:
318-
jobOptions?.removeOnComplete ??
319-
queueJobOptions?.removeOnComplete ??
320-
this.#globalJobOptions?.removeOnComplete,
321-
removeOnFail:
322-
jobOptions?.removeOnFail ??
323-
queueJobOptions?.removeOnFail ??
324-
this.#globalJobOptions?.removeOnFail,
258+
getConfigResolver(): QueueConfigResolver {
259+
if (!this.#initialized) {
260+
throw new errors.E_QUEUE_NOT_INITIALIZED()
325261
}
262+
263+
return this.#configResolver
326264
}
327265

328266
#validateConfig(config: QueueManagerConfig): void {
@@ -376,6 +314,7 @@ class QueueManagerSingleton {
376314

377315
this.#adapterInstances.clear()
378316
this.#initialized = false
317+
this.#configResolver = new QueueConfigResolver({})
379318
this.#fakeState = undefined
380319
}
381320
}

src/worker.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,8 @@ export class Worker {
331331
debug('worker %s: executing job %s (%s)', this.#id, job.id, job.name)
332332

333333
const { instance, options, timeout, context, payload } = await this.#initJob(job, queue)
334-
const retention = QueueManager.getMergedJobOptions(queue, options)
334+
const configResolver = QueueManager.getConfigResolver()
335+
const retention = configResolver.resolveJobOptions(queue, options)
335336

336337
try {
337338
await this.#executeWithTimeout(instance, payload, context, timeout)
@@ -349,7 +350,7 @@ export class Worker {
349350
return
350351
}
351352

352-
const mergedConfig = QueueManager.getMergedRetryConfig(queue, options.retry)
353+
const mergedConfig = configResolver.resolveRetryConfig(queue, options.retry)
353354

354355
if (typeof mergedConfig.maxRetries === 'undefined' || mergedConfig.maxRetries <= 0) {
355356
debug('worker %s: job %s has no retries configured, marking as failed', this.#id, job.id)
@@ -417,7 +418,7 @@ export class Worker {
417418
return { instance, options, timeout, context, payload: job.payload }
418419
} catch (error) {
419420
debug('worker %s: failed to initialize job %s (%s)', this.#id, job.id, job.name)
420-
const retention = QueueManager.getMergedJobOptions(queue)
421+
const retention = QueueManager.getConfigResolver().resolveJobOptions(queue)
421422
await this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail)
422423
throw error
423424
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import { test } from '@japa/runner'
2+
import { exponentialBackoff } from '../src/strategies/backoff_strategy.js'
3+
import { QueueConfigResolver } from '../src/queue_config_resolver.js'
4+
5+
test.group('QueueConfigResolver', () => {
6+
test('should merge retry configurations correctly (global)', ({ assert }) => {
7+
const resolver = QueueConfigResolver.from({
8+
default: 'sync',
9+
adapters: { sync: () => ({}) as any },
10+
retry: { maxRetries: 5, backoff: exponentialBackoff() },
11+
})
12+
13+
let config = resolver.resolveRetryConfig('default')
14+
assert.equal(config.maxRetries, 5)
15+
})
16+
17+
test('should merge retry configurations correctly (queue)', ({ assert }) => {
18+
const resolver = QueueConfigResolver.from({
19+
default: 'sync',
20+
adapters: { sync: () => ({}) as any },
21+
retry: { maxRetries: 5, backoff: exponentialBackoff() },
22+
queues: {
23+
email: { retry: { maxRetries: 3 } },
24+
},
25+
})
26+
27+
let config = resolver.resolveRetryConfig('email')
28+
assert.equal(config.maxRetries, 3)
29+
})
30+
31+
test('should merge retry configurations correctly (job)', ({ assert }) => {
32+
const resolver = QueueConfigResolver.from({
33+
default: 'sync',
34+
adapters: { sync: () => ({}) as any },
35+
retry: { maxRetries: 5, backoff: exponentialBackoff() },
36+
queues: {
37+
email: { retry: { maxRetries: 3 } },
38+
},
39+
})
40+
41+
let config = resolver.resolveRetryConfig('email', { maxRetries: 2 })
42+
assert.equal(config.maxRetries, 2)
43+
})
44+
45+
test('should respect maxRetries: 0 from job config over global/queue config', ({ assert }) => {
46+
const resolver = QueueConfigResolver.from({
47+
default: 'sync',
48+
adapters: { sync: () => ({}) as any },
49+
retry: { maxRetries: 5, backoff: exponentialBackoff() },
50+
queues: {
51+
email: { retry: { maxRetries: 3 } },
52+
},
53+
})
54+
55+
let config = resolver.resolveRetryConfig('email', { maxRetries: 0 })
56+
assert.equal(config.maxRetries, 0)
57+
})
58+
59+
test('should resolve job retention options with correct precedence', ({ assert }) => {
60+
const resolver = QueueConfigResolver.from({
61+
default: 'sync',
62+
adapters: { sync: () => ({}) as any },
63+
defaultJobOptions: { removeOnFail: { age: '7d' } },
64+
queues: {
65+
email: {
66+
defaultJobOptions: {
67+
removeOnFail: { age: '3d' },
68+
removeOnComplete: { count: 50 },
69+
},
70+
},
71+
},
72+
})
73+
74+
const resolved = resolver.resolveJobOptions('email', {
75+
removeOnComplete: false,
76+
})
77+
78+
assert.deepEqual(resolved, {
79+
removeOnComplete: false,
80+
removeOnFail: { age: '3d' },
81+
})
82+
})
83+
84+
test('should expose configured worker timeout', ({ assert }) => {
85+
const resolver = QueueConfigResolver.from({
86+
default: 'sync',
87+
adapters: { sync: () => ({}) as any },
88+
worker: { timeout: '30s' },
89+
})
90+
91+
assert.equal(resolver.getWorkerTimeout(), '30s')
92+
})
93+
})

0 commit comments

Comments
 (0)