-
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathadapter.ts
More file actions
266 lines (241 loc) · 7.62 KB
/
adapter.ts
File metadata and controls
266 lines (241 loc) · 7.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import type {
JobData,
JobRecord,
JobRetention,
ScheduleConfig,
ScheduleData,
ScheduleListOptions,
} from '../types/main.js'
/**
* A job that has been acquired by a worker for processing.
* Extends JobData with the timestamp when the job was acquired.
*/
export interface AcquiredJob extends JobData {
/** Timestamp (in ms) when the job was acquired by the worker */
acquiredAt: number
}
/**
* Adapter interface for queue storage backends.
*
* Implementations handle job persistence, atomic operations, and
* concurrency control. Built-in adapters: Redis, Knex (PostgreSQL/SQLite).
*
* @example
* ```typescript
* import { redis } from '@boringnode/queue'
*
* const config = {
* default: 'redis',
* adapters: {
* redis: redis({ host: 'localhost', port: 6379 })
* }
* }
* ```
*/
export interface Adapter {
/**
* Set the worker ID for this adapter instance.
* Required before calling pop methods when consuming jobs.
*
* @param workerId - Unique identifier for the worker
*/
setWorkerId(workerId: string): void
/**
* Pop the next available job from the default queue.
* Atomically moves the job from pending to active state.
*
* @returns The acquired job, or null if queue is empty
*/
pop(): Promise<AcquiredJob | null>
/**
* Pop the next available job from a specific queue.
* Atomically moves the job from pending to active state.
*
* @param queue - The queue name to pop from
* @returns The acquired job, or null if queue is empty
*/
popFrom(queue: string): Promise<AcquiredJob | null>
/**
* Recover stalled jobs that have been active for too long.
* A stalled job is one where the worker stopped responding (e.g., crash).
*
* Jobs within maxStalledCount are moved back to pending.
* Jobs exceeding maxStalledCount are failed permanently.
*
* @param queue - The queue to check for stalled jobs
* @param stalledThreshold - Duration in ms after which a job is considered stalled
* @param maxStalledCount - Maximum times a job can be recovered before failing
* @returns Number of jobs that were recovered (not including permanently failed ones)
*/
recoverStalledJobs(
queue: string,
stalledThreshold: number,
maxStalledCount: number
): Promise<number>
/**
* Mark a job as completed and remove it from the queue.
*
* @param jobId - The job ID to complete
* @param queue - The queue the job belongs to
* @param removeOnComplete - Optional retention policy for completed jobs
* @param output - Optional output returned by the job
*/
completeJob(
jobId: string,
queue: string,
removeOnComplete?: JobRetention,
output?: any
): Promise<void>
/**
* Mark a job as failed permanently and remove it from the queue.
*
* @param jobId - The job ID to fail
* @param queue - The queue the job belongs to
* @param error - Optional error that caused the failure
* @param removeOnFail - Optional retention policy for failed jobs
*/
failJob(jobId: string, queue: string, error?: Error, removeOnFail?: JobRetention): Promise<void>
/**
* Retry a job by moving it back to pending with incremented attempts.
*
* @param jobId - The job ID to retry
* @param queue - The queue the job belongs to
* @param retryAt - Optional future date to delay the retry
*/
retryJob(jobId: string, queue: string, retryAt?: Date): Promise<void>
/**
* Get a job record by id.
*
* @param jobId - The job ID to retrieve
* @param queue - The queue the job belongs to
* @returns The job record, or null if not found
*/
getJob(jobId: string, queue: string): Promise<JobRecord | null>
/**
* Push a job to the default queue for immediate processing.
*
* @param jobData - The job data to push
*/
push(jobData: JobData): Promise<void>
/**
* Push a job to a specific queue for immediate processing.
*
* @param queue - The queue name to push to
* @param jobData - The job data to push
*/
pushOn(queue: string, jobData: JobData): Promise<void>
/**
* Push a job to the default queue with a delay.
*
* @param jobData - The job data to push
* @param delay - Delay in milliseconds before the job becomes available
*/
pushLater(jobData: JobData, delay: number): Promise<void>
/**
* Push a job to a specific queue with a delay.
*
* @param queue - The queue name to push to
* @param jobData - The job data to push
* @param delay - Delay in milliseconds before the job becomes available
*/
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void>
/**
* Push multiple jobs to the default queue for immediate processing.
*
* This is more efficient than calling push() multiple times as it
* batches the operations (e.g., Redis pipeline, SQL batch insert).
*
* @param jobs - Array of job data to push
*/
pushMany(jobs: JobData[]): Promise<void>
/**
* Push multiple jobs to a specific queue for immediate processing.
*
* This is more efficient than calling pushOn() multiple times as it
* batches the operations (e.g., Redis pipeline, SQL batch insert).
*
* @param queue - The queue name to push to
* @param jobs - Array of job data to push
*/
pushManyOn(queue: string, jobs: JobData[]): Promise<void>
/**
* Get the number of pending jobs in the default queue.
*
* @returns The number of pending jobs
*/
size(): Promise<number>
/**
* Get the number of pending jobs in a specific queue.
*
* @param queue - The queue name to check
* @returns The number of pending jobs
*/
sizeOf(queue: string): Promise<number>
/**
* Clean up resources (close connections, etc.).
* Called when the worker stops or the adapter is no longer needed.
*/
destroy(): Promise<void>
/**
* Create or update a schedule.
*
* If a schedule with the given id exists, it will be updated (upsert).
* Otherwise, a new schedule is created.
*
* @param config - The schedule configuration
* @returns The schedule ID
*/
upsertSchedule(config: ScheduleConfig): Promise<string>
/**
* Create or update a schedule.
*
* @deprecated Use `upsertSchedule` instead.
* @param config - The schedule configuration
* @returns The schedule ID
*/
createSchedule(config: ScheduleConfig): Promise<string>
/**
* Get a schedule by ID.
*
* @param id - The schedule ID
* @returns The schedule data, or null if not found
*/
getSchedule(id: string): Promise<ScheduleData | null>
/**
* List all schedules matching the given options.
*
* @param options - Optional filters for listing
* @returns Array of schedule data
*/
listSchedules(options?: ScheduleListOptions): Promise<ScheduleData[]>
/**
* Update a schedule's status or run metadata.
*
* @param id - The schedule ID
* @param updates - The fields to update
*/
updateSchedule(
id: string,
updates: Partial<Pick<ScheduleData, 'status' | 'nextRunAt' | 'lastRunAt' | 'runCount'>>
): Promise<void>
/**
* Delete a schedule permanently.
*
* @param id - The schedule ID to delete
*/
deleteSchedule(id: string): Promise<void>
/**
* Atomically claim a due schedule for execution.
*
* This method:
* 1. Finds ONE schedule where nextRunAt <= now AND status = 'active'
* 2. Calculates and updates its nextRunAt to the next occurrence
* 3. Increments runCount and sets lastRunAt
* 4. Returns the schedule data for job dispatching
*
* The atomic nature prevents multiple workers from claiming the same schedule.
*
* @returns The claimed schedule, or null if no schedules are due
*/
claimDueSchedule(): Promise<ScheduleData | null>
}