Skip to content

Commit 6543473

Browse files
authored
Rename baseservice.NowUTC to Now (#1215)
1 parent 4efc979 commit 6543473

28 files changed

Lines changed: 150 additions & 134 deletions

client.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,9 +1423,9 @@ func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*ri
14231423
func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, jobID int64) (*rivertype.JobRow, error) {
14241424
return c.pilot.JobCancel(ctx, exec, &riverdriver.JobCancelParams{
14251425
ID: jobID,
1426-
CancelAttemptedAt: c.baseService.Time.NowUTC(),
1426+
CancelAttemptedAt: c.baseService.Time.Now(),
14271427
ControlTopic: string(notifier.NotificationTopicControl),
1428-
Now: c.baseService.Time.NowUTCOrNil(),
1428+
Now: c.baseService.Time.NowOrNil(),
14291429
Schema: c.config.Schema,
14301430
})
14311431
}
@@ -1504,7 +1504,7 @@ func (c *Client[TTx]) JobRetryTx(ctx context.Context, tx TTx, id int64) (*rivert
15041504
func (c *Client[TTx]) jobRetry(ctx context.Context, exec riverdriver.Executor, id int64) (*rivertype.JobRow, error) {
15051505
return c.pilot.JobRetry(ctx, exec, &riverdriver.JobRetryParams{
15061506
ID: id,
1507-
Now: c.baseService.Time.NowUTCOrNil(),
1507+
Now: c.baseService.Time.NowOrNil(),
15081508
Schema: c.config.Schema,
15091509
})
15101510
}
@@ -1624,7 +1624,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
16241624
// If the time is stubbed (in a test), use that for `created_at`. Otherwise,
16251625
// leave an empty value which will either use the database's `now()` or be defaulted
16261626
// by drivers as necessary.
1627-
createdAt := archetype.Time.NowUTCOrNil()
1627+
createdAt := archetype.Time.NowOrNil()
16281628

16291629
maxAttempts := cmp.Or(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts)
16301630
priority := cmp.Or(insertOpts.Priority, jobInsertOpts.Priority, rivercommon.PriorityDefault)
@@ -2554,7 +2554,7 @@ func (c *Client[TTx]) QueuePause(ctx context.Context, name string, opts *QueuePa
25542554

25552555
if err := tx.QueuePause(ctx, &riverdriver.QueuePauseParams{
25562556
Name: name,
2557-
Now: c.baseService.Time.NowUTCOrNil(),
2557+
Now: c.baseService.Time.NowOrNil(),
25582558
Schema: c.config.Schema,
25592559
}); err != nil {
25602560
return err
@@ -2590,7 +2590,7 @@ func (c *Client[TTx]) QueuePauseTx(ctx context.Context, tx TTx, name string, opt
25902590

25912591
if err := executorTx.QueuePause(ctx, &riverdriver.QueuePauseParams{
25922592
Name: name,
2593-
Now: c.baseService.Time.NowUTCOrNil(),
2593+
Now: c.baseService.Time.NowOrNil(),
25942594
Schema: c.config.Schema,
25952595
}); err != nil {
25962596
return err
@@ -2624,7 +2624,7 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP
26242624

26252625
if err := tx.QueueResume(ctx, &riverdriver.QueueResumeParams{
26262626
Name: name,
2627-
Now: c.baseService.Time.NowUTCOrNil(),
2627+
Now: c.baseService.Time.NowOrNil(),
26282628
Schema: c.config.Schema,
26292629
}); err != nil {
26302630
return err
@@ -2661,7 +2661,7 @@ func (c *Client[TTx]) QueueResumeTx(ctx context.Context, tx TTx, name string, op
26612661

26622662
if err := executorTx.QueueResume(ctx, &riverdriver.QueueResumeParams{
26632663
Name: name,
2664-
Now: c.baseService.Time.NowUTCOrNil(),
2664+
Now: c.baseService.Time.NowOrNil(),
26652665
Schema: c.config.Schema,
26662666
}); err != nil {
26672667
return err

client_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6061,7 +6061,7 @@ func Test_Client_RetryPolicy(t *testing.T) {
60616061

60626062
client := newTestClient(t, dbPool, config)
60636063

6064-
now := client.baseService.Time.StubNowUTC(time.Now().UTC())
6064+
now := client.baseService.Time.StubNow(time.Now().UTC())
60656065
t.Logf("Now: %s", now)
60666066

60676067
subscribeChan, cancel := client.Subscribe(EventKindJobCompleted, EventKindJobFailed)
@@ -6736,7 +6736,7 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) {
67366736
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
67376737
// Anchor all limiter checks to a known base time so debounce behavior is
67386738
// independent from scheduler jitter or machine load.
6739-
now := client.baseService.Time.StubNowUTC(time.Now().UTC())
6739+
now := client.baseService.Time.StubNow(time.Now().UTC())
67406740

67416741
type insertPayload struct {
67426742
Queue string `json:"queue"`
@@ -6771,7 +6771,7 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) {
67716771
// Keep time fixed inside the cooldown window before issuing repeated queue1
67726772
// inserts. This guarantees that all of these inserts are ineligible for a
67736773
// second notification regardless of wall-clock runtime.
6774-
client.baseService.Time.StubNowUTC(now.Add(500 * time.Millisecond))
6774+
client.baseService.Time.StubNow(now.Add(500 * time.Millisecond))
67756775

67766776
for range 5 {
67776777
config.Logger.InfoContext(ctx, "inserting queue1 job")
@@ -6791,7 +6791,7 @@ func Test_Client_InsertNotificationsAreDeduplicatedAndDebounced(t *testing.T) {
67916791
expectImmediateNotification(t, "queue3") // Immediate first fire on queue3
67926792

67936793
// `ShouldTrigger` uses a strict `Before` check; move just past the boundary.
6794-
client.baseService.Time.StubNowUTC(now.Add(config.FetchCooldown + time.Nanosecond))
6794+
client.baseService.Time.StubNow(now.Add(config.FetchCooldown + time.Nanosecond))
67956795

67966796
// Now queue1 should immediately notify again.
67976797
expectImmediateNotification(t, "queue1")
@@ -6846,7 +6846,7 @@ func Test_Client_JobCompletion(t *testing.T) {
68466846

68476847
client, bundle := setup(t, config)
68486848

6849-
now := client.baseService.Time.StubNowUTC(time.Now().UTC())
6849+
now := client.baseService.Time.StubNow(time.Now().UTC())
68506850

68516851
insertRes, err := client.Insert(ctx, JobArgs{}, nil)
68526852
require.NoError(t, err)
@@ -6919,7 +6919,7 @@ func Test_Client_JobCompletion(t *testing.T) {
69196919

69206920
client, bundle := setup(t, config)
69216921

6922-
now := client.baseService.Time.StubNowUTC(time.Now().UTC())
6922+
now := client.baseService.Time.StubNow(time.Now().UTC())
69236923

69246924
insertRes, err := client.Insert(ctx, JobArgs{}, nil)
69256925
require.NoError(t, err)
@@ -6951,7 +6951,7 @@ func Test_Client_JobCompletion(t *testing.T) {
69516951

69526952
client, bundle := setup(t, config)
69536953

6954-
now := client.baseService.Time.StubNowUTC(time.Now().UTC())
6954+
now := client.baseService.Time.StubNow(time.Now().UTC())
69556955

69566956
insertRes, err := client.Insert(ctx, JobArgs{}, nil)
69576957
require.NoError(t, err)
@@ -7013,7 +7013,7 @@ func Test_Client_JobCompletion(t *testing.T) {
70137013

70147014
client, bundle := setup(t, newTestConfig(t, ""))
70157015

7016-
now := client.baseService.Time.StubNowUTC(time.Now().UTC())
7016+
now := client.baseService.Time.StubNow(time.Now().UTC())
70177017

70187018
type JobArgs struct {
70197019
testutil.JobArgsReflectKind[JobArgs]
@@ -8117,7 +8117,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
81178117
t.Parallel()
81188118

81198119
archetype := riversharedtest.BaseServiceArchetype(t)
8120-
archetype.Time.StubNowUTC(time.Now().UTC())
8120+
archetype.Time.StubNow(time.Now().UTC())
81218121

81228122
uniqueOpts := UniqueOpts{
81238123
ByArgs: true,
@@ -8147,7 +8147,7 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
81478147
t.Parallel()
81488148

81498149
archetype := riversharedtest.BaseServiceArchetype(t)
8150-
archetype.Time.StubNowUTC(time.Now().UTC())
8150+
archetype.Time.StubNow(time.Now().UTC())
81518151

81528152
states := []rivertype.JobState{
81538153
rivertype.JobStateAvailable,
@@ -8465,7 +8465,7 @@ func TestUniqueOpts(t *testing.T) {
84658465
// the current time, but make sure it's nicened up a little to be
84668466
// roughly in the middle of the hour and well clear of any period
84678467
// boundaries.
8468-
client.baseService.Time.StubNowUTC(
8468+
client.baseService.Time.StubNow(
84698469
time.Now().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond).UTC(),
84708470
)
84718471

internal/dbunique/db_unique.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func buildUniqueKeyString(timeGen rivertype.TimeGenerator, uniqueOpts *UniqueOpt
114114
}
115115

116116
if uniqueOpts.ByPeriod != time.Duration(0) {
117-
lowerPeriodBound := ptrutil.ValOrDefaultFunc(params.ScheduledAt, timeGen.NowUTC).Truncate(uniqueOpts.ByPeriod)
117+
lowerPeriodBound := ptrutil.ValOrDefaultFunc(params.ScheduledAt, timeGen.Now).Truncate(uniqueOpts.ByPeriod)
118118
sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339))
119119
}
120120

internal/dbunique/db_unique_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestUniqueKey(t *testing.T) {
2929
// Fixed timestamp for consistency across tests:
3030
now := time.Now().UTC()
3131
stubSvc := &riversharedtest.TimeStub{}
32-
stubSvc.StubNowUTC(now)
32+
stubSvc.StubNow(now)
3333

3434
tests := []struct {
3535
name string

internal/jobcompleter/job_completer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst
7878
c.wg.Add(1)
7979
defer c.wg.Done()
8080

81-
start := c.Time.NowUTC()
81+
start := c.Time.Now()
8282

8383
jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
84-
jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params))
84+
jobs, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowOrNil(), c.schema, params))
8585
if err != nil {
8686
return nil, err
8787
}
@@ -92,7 +92,7 @@ func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobst
9292
return err
9393
}
9494

95-
stats.CompleteDuration = c.Time.NowUTC().Sub(start)
95+
stats.CompleteDuration = c.Time.Now().Sub(start)
9696
c.subscribeCh <- []CompleterJobUpdated{{
9797
Job: jobs[0],
9898
JobStats: stats,
@@ -185,11 +185,11 @@ func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, schema s
185185
func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error {
186186
// Start clock outside of goroutine so that the time spent blocking waiting
187187
// for an errgroup slot is accurately measured.
188-
start := c.Time.NowUTC()
188+
start := c.Time.Now()
189189

190190
c.errGroup.Go(func() error {
191191
jobs, err := withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) {
192-
rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowUTCOrNil(), c.schema, params))
192+
rows, err := c.pilot.JobSetStateIfRunningMany(ctx, c.exec, setStateParamsToMany(c.Time.NowOrNil(), c.schema, params))
193193
if err != nil {
194194
return nil, err
195195
}
@@ -200,7 +200,7 @@ func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta
200200
return err
201201
}
202202

203-
stats.CompleteDuration = c.Time.NowUTC().Sub(start)
203+
stats.CompleteDuration = c.Time.Now().Sub(start)
204204
c.subscribeCh <- []CompleterJobUpdated{{
205205
Job: jobs[0],
206206
JobStats: stats,
@@ -479,7 +479,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
479479
events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated {
480480
setState := setStateBatch[jobRow.ID]
481481
startTime := setStateStartTimes[jobRow.ID]
482-
setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(startTime)
482+
setState.Stats.CompleteDuration = c.Time.Now().Sub(startTime)
483483
return CompleterJobUpdated{
484484
Job: jobRow,
485485
JobStats: setState.Stats,
@@ -504,7 +504,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
504504
}
505505

506506
func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error {
507-
now := c.Time.NowUTC()
507+
now := c.Time.Now()
508508
// If we've built up too much of a backlog because the completer's fallen
509509
// behind, block completions until the complete loop's had a chance to catch
510510
// up.

internal/jobexecutor/job_executor.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (e *JobExecutor) Execute(ctx context.Context) {
138138
// Ensure that the context is cancelled no matter what, or it will leak:
139139
defer e.CancelFunc(errExecutorDefaultCancel)
140140

141-
e.start = e.Time.NowUTC()
141+
e.start = e.Time.Now()
142142
e.stats = &jobstats.JobStatistics{
143143
QueueWaitDuration: e.start.Sub(e.JobRow.ScheduledAt),
144144
}
@@ -196,7 +196,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
196196
PanicVal: recovery,
197197
}
198198
}
199-
e.stats.RunDuration = e.Time.NowUTC().Sub(e.start)
199+
e.stats.RunDuration = e.Time.Now().Sub(e.start)
200200
}()
201201

202202
if e.WorkUnit == nil {
@@ -385,7 +385,7 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow
385385
// so we instead make the job immediately `available` if the snooze time is
386386
// smaller than the scheduler's run interval.
387387
var params *riverdriver.JobSetStateIfRunningParams
388-
if nextAttemptScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval {
388+
if nextAttemptScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval {
389389
params = riverdriver.JobSetStateSnoozedAvailable(jobRow.ID, nextAttemptScheduledAt, jobRow.Attempt-1, metadataUpdatesBytes)
390390
} else {
391391
params = riverdriver.JobSetStateSnoozed(jobRow.ID, nextAttemptScheduledAt, jobRow.Attempt-1, metadataUpdatesBytes)
@@ -409,7 +409,7 @@ func (e *JobExecutor) reportResult(ctx context.Context, jobRow *rivertype.JobRow
409409
return
410410
}
411411

412-
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(jobRow.ID, e.Time.NowUTC(), metadataUpdatesBytes)); err != nil {
412+
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(jobRow.ID, e.Time.Now(), metadataUpdatesBytes)); err != nil {
413413
e.Logger.ErrorContext(ctx, e.Name+": Error completing job",
414414
slog.String("err", err.Error()),
415415
slog.Int64("job_id", jobRow.ID),
@@ -462,7 +462,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
462462
return
463463
}
464464

465-
now := e.Time.NowUTC()
465+
now := e.Time.Now()
466466

467467
if cancelJob {
468468
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCancelled(jobRow.ID, now, errData, metadataUpdates)); err != nil {
@@ -502,7 +502,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
502502
// respected. Here, we offset that with a branch that makes jobs immediately
503503
// `available` if their retry was smaller than the scheduler's run interval.
504504
var params *riverdriver.JobSetStateIfRunningParams
505-
if nextRetryScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval {
505+
if nextRetryScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval {
506506
params = riverdriver.JobSetStateErrorAvailable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates)
507507
} else {
508508
params = riverdriver.JobSetStateErrorRetryable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates)

internal/jobexecutor/job_executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func TestJobExecutor_Execute(t *testing.T) {
251251

252252
executor, bundle := setup(t)
253253

254-
now := executor.Time.StubNowUTC(time.Now().UTC())
254+
now := executor.Time.StubNow(time.Now().UTC())
255255

256256
workerErr := errors.New("job error")
257257
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow)

internal/leadership/elector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error {
213213

214214
elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{
215215
LeaderID: e.config.ClientID,
216-
Now: e.Time.NowUTCOrNil(),
216+
Now: e.Time.NowOrNil(),
217217
Schema: e.config.Schema,
218218
TTL: e.leaderTTL(),
219219
})
@@ -391,7 +391,7 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error {
391391

392392
reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{
393393
LeaderID: e.config.ClientID,
394-
Now: e.Time.NowUTCOrNil(),
394+
Now: e.Time.NowOrNil(),
395395
Schema: e.config.Schema,
396396
TTL: e.leaderTTL(),
397397
})

internal/maintenance/job_scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er
174174
}
175175
defer dbutil.RollbackWithoutCancel(ctx, execTx)
176176

177-
now := s.Time.NowUTC()
177+
now := s.Time.Now()
178178
nowWithLookAhead := now.Add(s.config.Interval)
179179

180180
scheduledJobResults, err := execTx.JobSchedule(ctx, &riverdriver.JobScheduleParams{
@@ -194,7 +194,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er
194194
// slightly in the future (this loop, the notify, and tx commit will take
195195
// a small amount of time). This isn't going to be perfect, but the goal
196196
// is to roughly try to guess when the clients will attempt to fetch jobs.
197-
notificationHorizon := s.Time.NowUTC().Add(5 * time.Millisecond)
197+
notificationHorizon := s.Time.Now().Add(5 * time.Millisecond)
198198

199199
for _, result := range scheduledJobResults {
200200
if result.Job.ScheduledAt.After(notificationHorizon) {

internal/maintenance/periodic_job_enqueuer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
388388

389389
var (
390390
insertParamsMany []*rivertype.JobInsertParams
391-
now = s.Time.NowUTC()
391+
now = s.Time.Now()
392392
periodicJobUpsertParams = &riverpilot.PeriodicJobUpsertManyParams{Schema: s.Config.Schema}
393393
)
394394

@@ -417,7 +417,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
417417
periodicJobUpsertParams.Jobs = append(periodicJobUpsertParams.Jobs, &riverpilot.PeriodicJobUpsertParams{
418418
ID: periodicJob.ID,
419419
NextRunAt: periodicJob.nextRunAt,
420-
UpdatedAt: s.Time.NowUTC(),
420+
UpdatedAt: s.Time.Now(),
421421
})
422422
}
423423

@@ -452,7 +452,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
452452
periodicJobUpsertParams = &riverpilot.PeriodicJobUpsertManyParams{Schema: s.Config.Schema}
453453
)
454454

455-
now := s.Time.NowUTC()
455+
now := s.Time.Now()
456456

457457
// Add a small margin to the current time so we're not only
458458
// running jobs that are already ready, but also ones ready at
@@ -482,7 +482,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
482482
periodicJobUpsertParams.Jobs = append(periodicJobUpsertParams.Jobs, &riverpilot.PeriodicJobUpsertParams{
483483
ID: periodicJob.ID,
484484
NextRunAt: periodicJob.nextRunAt,
485-
UpdatedAt: s.Time.NowUTC(),
485+
UpdatedAt: s.Time.Now(),
486486
})
487487
}
488488
}
@@ -642,7 +642,7 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {
642642

643643
var (
644644
firstNextRunAt time.Time
645-
now = s.Time.NowUTC()
645+
now = s.Time.Now()
646646
)
647647

648648
for _, periodicJob := range s.periodicJobs {

0 commit comments

Comments
 (0)