Skip to content

Commit 793f370

Browse files
authored
Consider per-worker timeout overrides when rescuing jobs (#350)
This one came up when I was thinking about the job specific rescue threshold floated in [1]. I was going to suggest the possible workaround of setting an aggressive rescue threshold combined with a low job timeout globally, and then override the timeout on any specific job workers that needed to run longer than the new low global job timeout. But then I realized this wouldn't work because the job rescuer doesn't account for job-specific timeouts -- it just rescues or discards everything it finds beyond the run's rescue threshold. Here, add new logic to address that problem. Luckily we were already pulling worker information to procure what might be a possible custom retry schedule, so we just have to piggyback onto that to also examine a possible custom work timeout. [1] #347
1 parent 4def8bc commit 793f370

3 files changed

Lines changed: 112 additions & 54 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- River now considers per-worker timeout overrides when rescuing jobs so that jobs with a long custom timeout won't be rescued prematurely. [PR #350](https://github.com/riverqueue/river/pull/350).
13+
1014
## [0.6.0] - 2024-05-08
1115

1216
### Added

internal/maintenance/job_rescuer.go

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/riverqueue/river/internal/baseservice"
1212
"github.com/riverqueue/river/internal/maintenance/startstop"
1313
"github.com/riverqueue/river/internal/rivercommon"
14+
"github.com/riverqueue/river/internal/util/ptrutil"
1415
"github.com/riverqueue/river/internal/util/timeutil"
1516
"github.com/riverqueue/river/internal/util/valutil"
1617
"github.com/riverqueue/river/internal/workunit"
@@ -164,22 +165,20 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
164165
now := time.Now().UTC()
165166

166167
rescueManyParams := riverdriver.JobRescueManyParams{
167-
ID: make([]int64, len(stuckJobs)),
168-
Error: make([][]byte, len(stuckJobs)),
169-
FinalizedAt: make([]time.Time, len(stuckJobs)),
170-
ScheduledAt: make([]time.Time, len(stuckJobs)),
171-
State: make([]string, len(stuckJobs)),
168+
ID: make([]int64, 0, len(stuckJobs)),
169+
Error: make([][]byte, 0, len(stuckJobs)),
170+
FinalizedAt: make([]time.Time, 0, len(stuckJobs)),
171+
ScheduledAt: make([]time.Time, 0, len(stuckJobs)),
172+
State: make([]string, 0, len(stuckJobs)),
172173
}
173174

174-
for i, job := range stuckJobs {
175-
rescueManyParams.ID[i] = job.ID
176-
175+
for _, job := range stuckJobs {
177176
var metadata metadataWithCancelAttemptedAt
178177
if err := json.Unmarshal(job.Metadata, &metadata); err != nil {
179178
return nil, fmt.Errorf("error unmarshaling job metadata: %w", err)
180179
}
181180

182-
rescueManyParams.Error[i], err = json.Marshal(rivertype.AttemptError{
181+
errorData, err := json.Marshal(rivertype.AttemptError{
183182
At: now,
184183
Attempt: max(job.Attempt, 0),
185184
Error: "Stuck job rescued by Rescuer",
@@ -189,29 +188,41 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
189188
return nil, fmt.Errorf("error marshaling error JSON: %w", err)
190189
}
191190

191+
addRescueParam := func(state rivertype.JobState, finalizedAt *time.Time, scheduledAt time.Time) {
192+
rescueManyParams.ID = append(rescueManyParams.ID, job.ID)
193+
rescueManyParams.Error = append(rescueManyParams.Error, errorData)
194+
rescueManyParams.FinalizedAt = append(rescueManyParams.FinalizedAt, ptrutil.ValOrDefault(finalizedAt, time.Time{}))
195+
rescueManyParams.ScheduledAt = append(rescueManyParams.ScheduledAt, scheduledAt)
196+
rescueManyParams.State = append(rescueManyParams.State, string(state))
197+
}
198+
192199
if !metadata.CancelAttemptedAt.IsZero() {
193200
res.NumJobsCancelled++
194-
rescueManyParams.FinalizedAt[i] = now
195-
rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value
196-
rescueManyParams.State[i] = string(rivertype.JobStateCancelled)
201+
addRescueParam(rivertype.JobStateCancelled, &now, job.ScheduledAt) // reused previous scheduled value
197202
continue
198203
}
199-
shouldRetry, retryAt := s.makeRetryDecision(ctx, job)
200-
if shouldRetry {
201-
res.NumJobsRetried++
202-
rescueManyParams.ScheduledAt[i] = retryAt
203-
rescueManyParams.State[i] = string(rivertype.JobStateRetryable)
204-
} else {
204+
205+
retryDecision, retryAt := s.makeRetryDecision(ctx, job, now)
206+
207+
switch retryDecision {
208+
case jobRetryDecisionDiscard:
205209
res.NumJobsDiscarded++
206-
rescueManyParams.FinalizedAt[i] = now
207-
rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value
208-
rescueManyParams.State[i] = string(rivertype.JobStateDiscarded)
210+
addRescueParam(rivertype.JobStateDiscarded, &now, job.ScheduledAt) // reused previous scheduled value
211+
212+
case jobRetryDecisionIgnore:
213+
// job not timed out yet due to kind-specific timeout value; ignore
214+
215+
case jobRetryDecisionRetry:
216+
res.NumJobsRetried++
217+
addRescueParam(rivertype.JobStateRetryable, nil, retryAt)
209218
}
210219
}
211220

212-
_, err = s.exec.JobRescueMany(ctx, &rescueManyParams)
213-
if err != nil {
214-
return nil, fmt.Errorf("error rescuing stuck jobs: %w", err)
221+
if len(rescueManyParams.ID) > 0 {
222+
_, err = s.exec.JobRescueMany(ctx, &rescueManyParams)
223+
if err != nil {
224+
return nil, fmt.Errorf("error rescuing stuck jobs: %w", err)
225+
}
215226
}
216227

217228
s.TestSignals.UpdatedBatch.Signal(struct{}{})
@@ -245,14 +256,24 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err
245256
})
246257
}
247258

259+
// jobRetryDecision is a signal from makeRetryDecision as to what to do with a
260+
// particular job that appears to be eligible for rescue.
261+
type jobRetryDecision int
262+
263+
const (
264+
jobRetryDecisionDiscard jobRetryDecision = iota // discard the job
265+
jobRetryDecisionIgnore // don't retry or discard the job
266+
jobRetryDecisionRetry // retry the job
267+
)
268+
248269
// makeRetryDecision decides whether or not a rescued job should be retried, and if so,
249270
// when.
250-
func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow) (bool, time.Time) {
271+
func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow, now time.Time) (jobRetryDecision, time.Time) {
251272
workUnitFactory := s.Config.WorkUnitFactoryFunc(job.Kind)
252273
if workUnitFactory == nil {
253274
s.Logger.ErrorContext(ctx, s.Name+": Attempted to rescue unhandled job kind, discarding",
254275
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
255-
return false, time.Time{}
276+
return jobRetryDecisionDiscard, time.Time{}
256277
}
257278

258279
workUnit := workUnitFactory.MakeUnit(job)
@@ -261,9 +282,18 @@ func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRo
261282
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
262283
}
263284

285+
if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() {
286+
return jobRetryDecisionIgnore, time.Time{}
287+
}
288+
264289
nextRetry := workUnit.NextRetry()
265290
if nextRetry.IsZero() {
266291
nextRetry = s.Config.ClientRetryPolicy.NextRetry(job)
267292
}
268-
return job.Attempt < max(job.MaxAttempts, 0), nextRetry
293+
294+
if job.Attempt < max(job.MaxAttempts, 0) {
295+
return jobRetryDecisionRetry, nextRetry
296+
}
297+
298+
return jobRetryDecisionDiscard, time.Time{}
269299
}

internal/maintenance/job_rescuer_test.go

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,22 @@ import (
2323
// callbackWorkUnitFactory wraps a Worker to implement workUnitFactory.
2424
type callbackWorkUnitFactory struct {
2525
Callback func(ctx context.Context, jobRow *rivertype.JobRow) error
26+
timeout time.Duration // defaults to 0, which signals default timeout
2627
}
2728

2829
func (w *callbackWorkUnitFactory) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit {
29-
return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow}
30+
return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow, timeout: w.timeout}
3031
}
3132

3233
// callbackWorkUnit implements workUnit for a job and Worker.
3334
type callbackWorkUnit struct {
3435
callback func(ctx context.Context, jobRow *rivertype.JobRow) error
3536
jobRow *rivertype.JobRow
37+
timeout time.Duration // defaults to 0, which signals default timeout
3638
}
3739

3840
func (w *callbackWorkUnit) NextRetry() time.Time { return time.Now().Add(30 * time.Second) }
39-
func (w *callbackWorkUnit) Timeout() time.Duration { return 0 }
41+
func (w *callbackWorkUnit) Timeout() time.Duration { return w.timeout }
4042
func (w *callbackWorkUnit) Work(ctx context.Context) error { return w.callback(ctx, w.jobRow) }
4143
func (w *callbackWorkUnit) UnmarshalJob() error { return nil }
4244

@@ -51,10 +53,13 @@ func (p *SimpleClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time {
5153
func TestJobRescuer(t *testing.T) {
5254
t.Parallel()
5355

54-
const rescuerJobKind = "rescuer"
55-
5656
ctx := context.Background()
5757

58+
const (
59+
rescuerJobKind = "rescuer"
60+
rescuerJobKindLongTimeout = "rescuer_long_timeout"
61+
)
62+
5863
type testBundle struct {
5964
exec riverdriver.Executor
6065
rescueHorizon time.Time
@@ -76,8 +81,13 @@ func TestJobRescuer(t *testing.T) {
7681
Interval: JobRescuerIntervalDefault,
7782
RescueAfter: JobRescuerRescueAfterDefault,
7883
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
79-
if kind == rescuerJobKind {
80-
return &callbackWorkUnitFactory{Callback: func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil }}
84+
emptyCallback := func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil }
85+
86+
switch kind {
87+
case rescuerJobKind:
88+
return &callbackWorkUnitFactory{Callback: emptyCallback}
89+
case rescuerJobKindLongTimeout:
90+
return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: JobRescuerRescueAfterDefault + 5*time.Minute}
8191
}
8292
panic("unhandled kind: " + kind)
8393
},
@@ -135,11 +145,18 @@ func TestJobRescuer(t *testing.T) {
135145
stuckToCancelJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)})
136146
stuckToCancelJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued
137147

138-
// these aren't touched:
148+
// these aren't touched because they're in ineligible states
139149
notRunningJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCompleted), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
140150
notRunningJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateDiscarded), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
141151
notRunningJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCancelled), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
142152

153+
// Jobs with worker-specific long timeouts. The first isn't rescued
154+
// because the difference between its `attempted_at` and now is still
155+
// within the timeout threshold. The second _is_ rescued because it
156+
// started earlier and even with the longer timeout, has still timed out.
157+
longTimeOutJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})
158+
longTimeOutJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-6 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})
159+
143160
require.NoError(cleaner.Start(ctx))
144161

145162
cleaner.TestSignals.FetchedBatch.WaitOrTimeout()
@@ -158,37 +175,44 @@ func TestJobRescuer(t *testing.T) {
158175
require.NoError(err)
159176
require.Equal(stuckToRetryJob3.State, job3After.State) // not rescued
160177

161-
discard1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID)
178+
discardJob1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID)
162179
require.NoError(err)
163-
require.Equal(rivertype.JobStateDiscarded, discard1After.State)
164-
require.WithinDuration(time.Now(), *discard1After.FinalizedAt, 5*time.Second)
165-
require.Len(discard1After.Errors, 1)
180+
require.Equal(rivertype.JobStateDiscarded, discardJob1After.State)
181+
require.WithinDuration(time.Now(), *discardJob1After.FinalizedAt, 5*time.Second)
182+
require.Len(discardJob1After.Errors, 1)
166183

167-
discard2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID)
184+
discardJob2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID)
168185
require.NoError(err)
169-
require.Equal(rivertype.JobStateRunning, discard2After.State)
170-
require.Nil(discard2After.FinalizedAt)
186+
require.Equal(rivertype.JobStateRunning, discardJob2After.State)
187+
require.Nil(discardJob2After.FinalizedAt)
171188

172-
cancel1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID)
189+
cancelJob1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID)
173190
require.NoError(err)
174-
require.Equal(rivertype.JobStateCancelled, cancel1After.State)
175-
require.WithinDuration(time.Now(), *cancel1After.FinalizedAt, 5*time.Second)
176-
require.Len(cancel1After.Errors, 1)
191+
require.Equal(rivertype.JobStateCancelled, cancelJob1After.State)
192+
require.WithinDuration(time.Now(), *cancelJob1After.FinalizedAt, 5*time.Second)
193+
require.Len(cancelJob1After.Errors, 1)
177194

178-
cancel2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID)
195+
cancelJob2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID)
179196
require.NoError(err)
180-
require.Equal(rivertype.JobStateRunning, cancel2After.State)
181-
require.Nil(cancel2After.FinalizedAt)
197+
require.Equal(rivertype.JobStateRunning, cancelJob2After.State)
198+
require.Nil(cancelJob2After.FinalizedAt)
182199

183-
notRunning1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID)
200+
notRunningJob1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID)
184201
require.NoError(err)
185-
require.Equal(notRunning1After.State, notRunningJob1.State)
186-
notRunning2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID)
202+
require.Equal(notRunningJob1.State, notRunningJob1After.State)
203+
notRunningJob2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID)
204+
require.NoError(err)
205+
require.Equal(notRunningJob2.State, notRunningJob2After.State)
206+
notRunningJob3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID)
207+
require.NoError(err)
208+
require.Equal(notRunningJob3.State, notRunningJob3After.State)
209+
210+
notTimedOutJob1After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob1.ID)
187211
require.NoError(err)
188-
require.Equal(notRunning2After.State, notRunningJob2.State)
189-
notRunning3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID)
212+
require.Equal(rivertype.JobStateRunning, notTimedOutJob1After.State)
213+
notTimedOutJob2After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob2.ID)
190214
require.NoError(err)
191-
require.Equal(notRunning3After.State, notRunningJob3.State)
215+
require.Equal(rivertype.JobStateRetryable, notTimedOutJob2After.State)
192216
})
193217

194218
t.Run("RescuesInBatches", func(t *testing.T) {

0 commit comments

Comments
 (0)