Skip to content

Commit ab8f2c1

Browse files
committed
fix(sync): honor retries and failed hook with shared job runtime
1 parent d5c250e commit ab8f2c1

8 files changed

Lines changed: 527 additions & 112 deletions

File tree

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,17 @@ import { sync } from '@boringnode/queue/drivers/sync_adapter'
292292
const adapter = sync() // Jobs execute immediately
293293
```
294294

295+
Use the `sync` adapter for tests and lightweight local development only.
296+
297+
- `await MyJob.dispatch(payload).run()` waits for the job to fully finish.
298+
- Retries are executed inline, not by a background worker.
299+
- If you configure backoff, the adapter will `sleep` between attempts.
300+
- This means the caller can stay blocked for the full retry duration.
301+
302+
Example: with `maxRetries: 3` and an exponential backoff of `1s`, `2s`, `4s`,
303+
the request or command that dispatched the job can stay busy for about 7 seconds
304+
before the job exhausts its retries and runs `failed()`.
305+
295306
## Job Options
296307

297308
```typescript
@@ -338,6 +349,14 @@ export default class ReliableJob extends Job<Payload> {
338349
}
339350
```
340351

352+
`maxRetries` can be defined directly on the job options, and `retry.backoff`
353+
controls the delay between attempts.
354+
355+
> With the `sync` adapter, these delays happen inline in the caller via
356+
> `sleep`. If a job fails repeatedly, `dispatch().run()` will take as long as
357+
> the total backoff duration. Use a worker-backed adapter when you do not want
358+
> retries to slow down the request/command that dispatched the job.
359+
341360
<details>
342361
<summary><strong>Available strategies</strong></summary>
343362

src/drivers/sync_adapter.ts

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import { setTimeout as sleep } from 'node:timers/promises'
12
import { Locator } from '../locator.js'
23
import { QueueManager } from '../queue_manager.js'
4+
import { JobExecutionRuntime } from '../job_runtime.js'
35
import type { Adapter, AcquiredJob } from '../contracts/adapter.js'
46
import type {
57
JobContext,
@@ -30,7 +32,7 @@ export class SyncAdapter implements Adapter {
3032
}
3133

3234
pushOn(queue: string, jobData: JobData): Promise<void> {
33-
return this.#execute(jobData.name, jobData.payload, queue)
35+
return this.#execute(jobData, queue)
3436
}
3537

3638
pushLater(jobData: JobData, delay: number): Promise<void> {
@@ -39,7 +41,7 @@ export class SyncAdapter implements Adapter {
3941

4042
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
4143
setTimeout(() => {
42-
void this.#execute(jobData.name, jobData.payload, queue)
44+
void this.#execute(jobData, queue)
4345
}, delay)
4446

4547
return Promise.resolve()
@@ -142,27 +144,58 @@ export class SyncAdapter implements Adapter {
142144
return Promise.resolve(null)
143145
}
144146

145-
async #execute(jobName: string, payload: unknown, queue: string = 'default'): Promise<void> {
146-
const JobClass = Locator.get(jobName)
147+
async #execute(jobData: JobData, queue: string = 'default'): Promise<void> {
148+
const JobClass = Locator.get(jobData.name)
147149

148150
if (!JobClass) {
149-
throw new Error(`Job class ${jobName} not found.`)
150-
}
151-
152-
const context: JobContext = {
153-
jobId: `sync-${Date.now()}`,
154-
name: jobName,
155-
attempt: 1,
156-
queue,
157-
priority: DEFAULT_PRIORITY,
158-
acquiredAt: new Date(),
159-
stalledCount: 0,
151+
throw new Error(`Job class ${jobData.name} not found.`)
160152
}
161153

154+
const options = JobClass.options || {}
155+
const configResolver = QueueManager.getConfigResolver()
156+
const runtime = JobExecutionRuntime.from({
157+
jobName: jobData.name,
158+
options,
159+
retryConfig: configResolver.resolveRetryConfig(queue, options),
160+
defaultTimeout: configResolver.getWorkerTimeout(),
161+
})
162162
const jobFactory = QueueManager.getJobFactory()
163-
const jobInstance = jobFactory ? await jobFactory(JobClass) : new JobClass()
164-
165-
jobInstance.$hydrate(payload, context)
166-
await jobInstance.execute()
163+
let attempts = jobData.attempts
164+
165+
while (true) {
166+
const context: JobContext = {
167+
jobId: jobData.id,
168+
name: jobData.name,
169+
attempt: attempts + 1,
170+
queue,
171+
priority: jobData.priority ?? DEFAULT_PRIORITY,
172+
acquiredAt: new Date(),
173+
stalledCount: jobData.stalledCount ?? 0,
174+
}
175+
176+
const jobInstance = jobFactory ? await jobFactory(JobClass) : new JobClass()
177+
178+
try {
179+
await runtime.execute(jobInstance, jobData.payload, context)
180+
return
181+
} catch (error) {
182+
const outcome = runtime.resolveFailure(error as Error, attempts)
183+
184+
if (outcome.type === 'failed') {
185+
await jobInstance.failed?.(outcome.hookError)
186+
return
187+
}
188+
189+
attempts++
190+
191+
if (outcome.type === 'retry' && outcome.retryAt) {
192+
const delay = outcome.retryAt.getTime() - Date.now()
193+
194+
if (delay > 0) {
195+
await sleep(delay)
196+
}
197+
}
198+
}
199+
}
167200
}
168201
}

src/job_runtime.ts

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import * as errors from './exceptions.js'
2+
import type { Job } from './job.js'
3+
import type { Duration, JobContext, JobOptions, RetryConfig } from './types/main.js'
4+
import { parse } from './utils.js'
5+
6+
export type JobExecutionOutcome =
7+
| { type: 'completed' }
8+
| { type: 'retry'; retryAt?: Date }
9+
| {
10+
type: 'failed'
11+
reason: 'timeout' | 'no-retries' | 'max-attempts'
12+
storageError: Error
13+
hookError: Error
14+
}
15+
16+
type JobExecutionRuntimeConfig = {
17+
jobName: string
18+
options?: JobOptions
19+
retryConfig: RetryConfig
20+
defaultTimeout?: Duration
21+
}
22+
23+
type JobExecutionRuntimeFactoryOptions = {
24+
jobName: string
25+
options?: JobOptions
26+
retryConfig: RetryConfig
27+
defaultTimeout?: Duration
28+
}
29+
30+
/**
31+
* Shared execution policy for a single job runtime.
32+
*
33+
* It encapsulates timeout resolution and retry/failure decisions so the
34+
* worker and the sync adapter follow the same execution rules.
35+
*/
36+
export class JobExecutionRuntime {
37+
readonly #jobName: string
38+
readonly #options: JobOptions
39+
readonly #retryConfig: RetryConfig
40+
readonly #timeout?: number
41+
42+
/**
43+
* Build a runtime from already-resolved queue/job execution config.
44+
*/
45+
static from({
46+
jobName,
47+
options,
48+
retryConfig,
49+
defaultTimeout,
50+
}: JobExecutionRuntimeFactoryOptions): JobExecutionRuntime {
51+
return new JobExecutionRuntime({
52+
jobName,
53+
options,
54+
retryConfig,
55+
defaultTimeout,
56+
})
57+
}
58+
59+
get maxRetries(): number | undefined {
60+
return this.#retryConfig.maxRetries
61+
}
62+
63+
/**
64+
* Create a runtime with fully resolved retry and timeout settings.
65+
*/
66+
constructor({ jobName, options, retryConfig, defaultTimeout }: JobExecutionRuntimeConfig) {
67+
this.#jobName = jobName
68+
this.#options = options || {}
69+
this.#retryConfig = retryConfig
70+
71+
const timeout = this.#options.timeout ?? defaultTimeout
72+
this.#timeout = timeout === undefined ? undefined : parse(timeout)
73+
}
74+
75+
/**
76+
* Execute a hydrated job instance and enforce the configured timeout.
77+
*/
78+
async execute(instance: Job, payload: unknown, context: JobContext): Promise<void> {
79+
if (this.#timeout === undefined) {
80+
instance.$hydrate(payload, context)
81+
return instance.execute()
82+
}
83+
84+
const signal = AbortSignal.timeout(this.#timeout)
85+
instance.$hydrate(payload, context, signal)
86+
87+
const { abortPromise, cleanupAbortListener } = this.#createTimeoutAbortRace(
88+
signal,
89+
instance.constructor.name
90+
)
91+
92+
try {
93+
await Promise.race([instance.execute(), abortPromise])
94+
} finally {
95+
cleanupAbortListener()
96+
}
97+
}
98+
99+
/**
100+
* Convert an execution error into a retry or permanent-failure outcome.
101+
*/
102+
resolveFailure(error: Error, attempts: number): JobExecutionOutcome {
103+
if (error instanceof errors.E_JOB_TIMEOUT && this.#options.failOnTimeout) {
104+
return {
105+
type: 'failed',
106+
reason: 'timeout',
107+
storageError: error,
108+
hookError: error,
109+
}
110+
}
111+
112+
if (typeof this.#retryConfig.maxRetries === 'undefined' || this.#retryConfig.maxRetries <= 0) {
113+
return {
114+
type: 'failed',
115+
reason: 'no-retries',
116+
storageError: error,
117+
hookError: error,
118+
}
119+
}
120+
121+
if (attempts >= this.#retryConfig.maxRetries) {
122+
return {
123+
type: 'failed',
124+
reason: 'max-attempts',
125+
storageError: error,
126+
hookError: new errors.E_JOB_MAX_ATTEMPTS_REACHED([this.#jobName], { cause: error }),
127+
}
128+
}
129+
130+
if (this.#retryConfig.backoff) {
131+
return {
132+
type: 'retry',
133+
retryAt: this.#retryConfig.backoff().getNextRetryAt(attempts + 1),
134+
}
135+
}
136+
137+
return { type: 'retry' }
138+
}
139+
140+
/**
141+
* Create the timeout race used to abort a job execution cleanly.
142+
*/
143+
#createTimeoutAbortRace(signal: AbortSignal, runtimeJobName: string) {
144+
const timeout = this.#timeout!
145+
let abortHandler: (() => void) | undefined
146+
147+
const abortPromise = new Promise<never>((_, reject) => {
148+
abortHandler = () => {
149+
reject(new errors.E_JOB_TIMEOUT([runtimeJobName, timeout]))
150+
}
151+
152+
if (signal.aborted) {
153+
abortHandler()
154+
return
155+
}
156+
157+
signal.addEventListener('abort', abortHandler, { once: true })
158+
})
159+
160+
return {
161+
abortPromise,
162+
cleanupAbortListener: () => {
163+
if (abortHandler) {
164+
signal.removeEventListener('abort', abortHandler)
165+
}
166+
},
167+
}
168+
}
169+
}

src/queue_config_resolver.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ export class QueueConfigResolver {
5454
/**
5555
* Resolve the retry policy for a job using priority: job > queue > global.
5656
*/
57-
resolveRetryConfig(queue: string, jobRetryConfig?: RetryConfig): RetryConfig {
57+
resolveRetryConfig(queue: string, jobOptions?: JobOptions): RetryConfig {
5858
const queueConfig = this.#queueConfigs.get(queue)
5959
const queueRetryConfig = queueConfig?.retry || {}
60+
const jobRetryConfig = this.#normalizeJobRetryConfig(jobOptions)
6061

6162
const maxRetries =
6263
jobRetryConfig?.maxRetries ??
@@ -95,4 +96,22 @@ export class QueueConfigResolver {
9596
getWorkerTimeout(): Duration | undefined {
9697
return this.#workerTimeout
9798
}
99+
100+
/**
101+
* Normalize job retry settings so top-level `maxRetries` participates in the
102+
* merge like `retry.maxRetries`.
103+
*/
104+
#normalizeJobRetryConfig(jobOptions?: JobOptions): RetryConfig | undefined {
105+
if (
106+
!jobOptions ||
107+
(jobOptions.retry === undefined && jobOptions.maxRetries === undefined)
108+
) {
109+
return undefined
110+
}
111+
112+
return {
113+
...jobOptions.retry,
114+
maxRetries: jobOptions.retry?.maxRetries ?? jobOptions.maxRetries,
115+
}
116+
}
98117
}

0 commit comments

Comments
 (0)