Skip to content

Commit b44f59a

Browse files
authored
Definitively fix race in periodic job enqueuer (#438)
Related to #409. I think I finally cracked it. For real this time. No workarounds, no hacks. If you look at the "after start" tests, they look roughly like this, start the client, then add some jobs: startService(t, svc) svc.Add( &PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, ) svc.Add( &PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true}, ) This would usually work as expected, with the enqueuer starting up, then jobs added, then those new jobs scheduled. _However_, what can happen is that the "after start" jobs are actually added in _before_ the enqueuer has finished entering its run loop. So, the service is starting up, makes a first call to `timeUntilNextRun`, and because the newly added periodic jobs have not been assigned a first run time, the code below would get a `timerUntilNextRun` of zero, which would cause it to enter the loop and run an insert iteration immediately, despite the jobs not actually being appropriate for scheduling as of yet: timerUntilNextRun := time.NewTimer(s.timeUntilNextRun()) for { select { case <-timerUntilNextRun.C: The fix is to modify `timeUntilNextRun` so that it ignores periodic jobs that have not yet been scheduled: for _, periodicJob := range s.periodicJobs { // Jobs may have been added after service start, but before this // function runs for the first time. They're not scheduled properly yet, // but they will be soon, at which point this function will run again. // Skip them for now. if periodicJob.nextRunAt.IsZero() { continue } Those jobs will be scheduled soon, then `timeUntilNextRun` called again, and the expected "time until" values returned. I'm confident enough in the fix that I've reverted most of #416 so the 5s jobs go back to 500ms. Fixes #409.
1 parent 79c093f commit b44f59a

2 files changed

Lines changed: 89 additions & 26 deletions

File tree

internal/maintenance/periodic_job_enqueuer.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
268268

269269
s.insertBatch(ctx, insertParamsMany, insertParamsUnique)
270270

271-
if len(insertParamsMany) > 0 {
271+
if len(insertParamsMany) > 0 || len(insertParamsUnique) > 0 {
272272
s.Logger.DebugContext(ctx, s.Name+": Inserted RunOnStart jobs", "num_jobs", len(insertParamsMany)+len(insertParamsUnique))
273273
}
274274
}
@@ -300,7 +300,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
300300
defer s.mu.RUnlock()
301301

302302
for _, periodicJob := range s.periodicJobs {
303-
if !periodicJob.nextRunAt.Before(nowWithMargin) {
303+
if periodicJob.nextRunAt.IsZero() || !periodicJob.nextRunAt.Before(nowWithMargin) {
304304
continue
305305
}
306306

@@ -405,6 +405,7 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany
405405
s.Logger.ErrorContext(ctx, s.Name+": Error committing transaction", "error", err.Error())
406406
return
407407
}
408+
408409
s.TestSignals.InsertedJobs.Signal(struct{}{})
409410
}
410411

@@ -427,14 +428,16 @@ func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, c
427428
return insertParams, uniqueOpts, true
428429
}
429430

431+
const periodicJobEnqueuerVeryLongDuration = 24 * time.Hour
432+
430433
func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {
431434
s.mu.RLock()
432435
defer s.mu.RUnlock()
433436

434437
// With no configured jobs, just return a big duration for the loop to block
435438
// on.
436439
if len(s.periodicJobs) < 1 {
437-
return 24 * time.Hour
440+
return periodicJobEnqueuerVeryLongDuration
438441
}
439442

440443
var (
@@ -443,6 +446,14 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {
443446
)
444447

445448
for _, periodicJob := range s.periodicJobs {
449+
// Jobs may have been added after service start, but before this
450+
// function runs for the first time. They're not scheduled properly yet,
451+
// but they will be soon, at which point this function will run again.
452+
// Skip them for now.
453+
if periodicJob.nextRunAt.IsZero() {
454+
continue
455+
}
456+
446457
// In case we detect a job that should've run before now, immediately short
447458
// circuit with a 0 duration. This avoids needlessly iterating through the
448459
// rest of the loop when we already know we're overdue for the next job.
@@ -455,5 +466,11 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {
455466
}
456467
}
457468

469+
// Only encountered unscheduled jobs (see comment above). Don't schedule
470+
// anything for now.
471+
if firstNextRunAt.IsZero() {
472+
return periodicJobEnqueuerVeryLongDuration
473+
}
474+
458475
return firstNextRunAt.Sub(now)
459476
}

internal/maintenance/periodic_job_enqueuer_test.go

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -366,15 +366,15 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
366366
startService(t, svc)
367367

368368
svc.Add(
369-
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
369+
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
370370
)
371371
svc.Add(
372-
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
372+
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
373373
)
374374

375375
svc.TestSignals.InsertedJobs.WaitOrTimeout()
376-
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
377-
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
376+
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
377+
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)
378378
})
379379

380380
t.Run("AddManyAfterStart", func(t *testing.T) {
@@ -385,13 +385,13 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
385385
startService(t, svc)
386386

387387
svc.AddMany([]*PeriodicJob{
388-
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
389-
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
388+
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
389+
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
390390
})
391391

392392
svc.TestSignals.InsertedJobs.WaitOrTimeout()
393-
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
394-
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
393+
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
394+
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)
395395
})
396396

397397
t.Run("ClearAfterStart", func(t *testing.T) {
@@ -402,20 +402,20 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
402402
startService(t, svc)
403403

404404
handles := svc.AddMany([]*PeriodicJob{
405-
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
406-
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
405+
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
406+
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
407407
})
408408

409409
svc.TestSignals.InsertedJobs.WaitOrTimeout()
410-
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
411-
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
410+
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
411+
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)
412412

413413
svc.Clear()
414414

415415
require.Empty(t, svc.periodicJobs)
416416

417417
handleAfterClear := svc.Add(
418-
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_new", false)},
418+
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_new", false)},
419419
)
420420

421421
// Handles are not reused.
@@ -431,13 +431,13 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
431431
startService(t, svc)
432432

433433
handles := svc.AddMany([]*PeriodicJob{
434-
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
435-
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
434+
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
435+
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
436436
})
437437

438438
svc.TestSignals.InsertedJobs.WaitOrTimeout()
439-
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
440-
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
439+
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
440+
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)
441441

442442
svc.Remove(handles[1])
443443

@@ -452,15 +452,15 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
452452
startService(t, svc)
453453

454454
handles := svc.AddMany([]*PeriodicJob{
455-
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
456-
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_other", false)},
457-
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
455+
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
456+
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_other", false)},
457+
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
458458
})
459459

460460
svc.TestSignals.InsertedJobs.WaitOrTimeout()
461-
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
462-
requireNJobs(t, bundle.exec, "periodic_job_5s_other", 0)
463-
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
461+
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
462+
requireNJobs(t, bundle.exec, "periodic_job_500ms_other", 0)
463+
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)
464464

465465
svc.RemoveMany([]rivertype.PeriodicJobHandle{handles[1], handles[2]})
466466

@@ -597,4 +597,50 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
597597
// Should be no jobs in the DB either:
598598
requireNJobs(t, bundle.exec, "unique_periodic_job_500ms", 0)
599599
})
600+
601+
t.Run("TimeUntilNextRun", func(t *testing.T) {
602+
t.Parallel()
603+
604+
svc, _ := setup(t)
605+
606+
now := svc.Time.StubNowUTC(time.Now())
607+
608+
// no jobs
609+
require.Equal(t, periodicJobEnqueuerVeryLongDuration, svc.timeUntilNextRun())
610+
611+
svc.periodicJobs = map[rivertype.PeriodicJobHandle]*PeriodicJob{
612+
1: {nextRunAt: now.Add(2 * time.Hour)},
613+
2: {nextRunAt: now.Add(1 * time.Hour)},
614+
3: {nextRunAt: now.Add(3 * time.Hour)},
615+
}
616+
617+
// pick job with soonest next run
618+
require.Equal(t, 1*time.Hour, svc.timeUntilNextRun())
619+
620+
svc.periodicJobs = map[rivertype.PeriodicJobHandle]*PeriodicJob{
621+
1: {nextRunAt: now.Add(2 * time.Hour)},
622+
2: {nextRunAt: now.Add(-1 * time.Hour)},
623+
3: {nextRunAt: now.Add(3 * time.Hour)},
624+
}
625+
626+
// job is already behind so time until next run is 0
627+
require.Equal(t, time.Duration(0), svc.timeUntilNextRun())
628+
629+
svc.periodicJobs = map[rivertype.PeriodicJobHandle]*PeriodicJob{
630+
1: {},
631+
2: {},
632+
}
633+
634+
// jobs not scheduled yet
635+
require.Equal(t, periodicJobEnqueuerVeryLongDuration, svc.timeUntilNextRun())
636+
637+
svc.periodicJobs = map[rivertype.PeriodicJobHandle]*PeriodicJob{
638+
1: {},
639+
2: {nextRunAt: now.Add(1 * time.Hour)},
640+
3: {},
641+
}
642+
643+
// pick job with soonest next run amongst some not scheduled yet
644+
require.Equal(t, 1*time.Hour, svc.timeUntilNextRun())
645+
})
600646
}

0 commit comments

Comments
 (0)