Skip to content

Commit 2b5c7e6

Browse files
rkannan82claude
andauthored
Expose thread-safe Executable.Attempt() on the interface (#9924)
## What Re-adds `Attempt()` to the `Executable` interface, backed by `atomic.Int64` so all access is thread-safe without needing the mutex. ## Why PR #8732 moved `attempt` out of the mutex and removed `Attempt()` from the interface as a simplification. An upcoming PR needs to read `Attempt()` from the executor (cross-goroutine), which requires thread-safe access. ## How did you test it? Unit tests: `go test ./service/history/queues/ -run Executable -count=1` — all passing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 5b92309 commit 2b5c7e6

2 files changed

Lines changed: 38 additions & 14 deletions

File tree

service/history/queues/executable.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"regexp"
1212
"runtime/debug"
1313
"sync"
14+
"sync/atomic"
1415
"time"
1516

1617
"go.opentelemetry.io/otel/attribute"
@@ -43,6 +44,7 @@ type (
4344
ctasks.Task
4445
tasks.Task
4546

47+
Attempt() int
4648
GetTask() tasks.Task
4749
GetPriority() ctasks.Priority
4850
GetScheduledTime() time.Time
@@ -123,7 +125,7 @@ type (
123125
dlqWriter *DLQWriter
124126

125127
readerID int64
126-
attempt int
128+
attempt atomic.Int64
127129
priority ctasks.Priority
128130
scheduledTime time.Time
129131
scheduleLatency time.Duration
@@ -190,7 +192,6 @@ func NewExecutable(
190192
Task: task,
191193
state: ctasks.TaskStatePending,
192194

193-
attempt: 1,
194195
executor: executor,
195196
scheduler: scheduler,
196197
rescheduler: rescheduler,
@@ -216,6 +217,7 @@ func NewExecutable(
216217
dlqErrorPattern: params.DLQErrorPattern,
217218
}
218219
e.refreshMetricsHandlers(nil)
220+
e.attempt.Store(1)
219221
e.priority = priorityAssigner.Assign(e)
220222

221223
loadTime := util.MaxTime(timeSource.Now(), task.GetKey().FireTime)
@@ -349,7 +351,7 @@ func (e *executableImpl) Execute() (retErr error) {
349351
// namespace did a failover,
350352
// reset task attempt since the execution logic used will change
351353
// reset task priority since it changes between active/standby
352-
e.attempt = 1
354+
e.resetAttempt()
353355
e.priority = e.priorityAssigner.Assign(e)
354356
}
355357
e.lastActiveness = resp.ExecutedAsActive
@@ -521,7 +523,7 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
521523
if e.isInvalidTaskError(err) {
522524
// only consider task invalid if it's the first attempt
523525
// otherwise we have no idea if it's invalid due to the (failed) write operation in previous attempts.
524-
e.invalidTask = e.attempt == 1
526+
e.invalidTask = e.attempt.Load() == 1
525527
return nil
526528
}
527529

@@ -541,12 +543,12 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
541543
logger := log.With(e.logger,
542544
tag.Error(err),
543545
tag.ErrorType(err),
544-
tag.Attempt(int32(e.attempt)),
546+
tag.Attempt(int32(e.attempt.Load())),
545547
tag.UnexpectedErrorAttempts(int32(e.unexpectedErrorAttempts)),
546548
tag.LifeCycleProcessingFailed,
547549
tag.String("task-category", e.GetCategory().Name()),
548550
)
549-
if e.attempt > taskCriticalLogMetricAttempts {
551+
if e.attempt.Load() > taskCriticalLogMetricAttempts {
550552
logger.Error("Critical error processing task, retrying.", tag.OperationCritical)
551553
} else {
552554
logger.Warn("Fail to process task")
@@ -653,7 +655,7 @@ func (e *executableImpl) Ack() {
653655
return
654656
}
655657

656-
metrics.TaskAttempt.With(e.chasmMetricsHandler).Record(int64(e.attempt))
658+
metrics.TaskAttempt.With(e.chasmMetricsHandler).Record(e.attempt.Load())
657659

658660
priorityTaggedProvider := e.chasmMetricsHandler.WithTags(metrics.TaskPriorityTag(e.priority.String()))
659661
metrics.TaskLatency.With(priorityTaggedProvider).Record(e.inMemoryNoUserLatency)
@@ -714,6 +716,10 @@ func (e *executableImpl) State() ctasks.State {
714716
return e.state
715717
}
716718

719+
func (e *executableImpl) Attempt() int {
720+
return int(e.attempt.Load())
721+
}
722+
717723
func (e *executableImpl) GetPriority() ctasks.Priority {
718724
return e.priority
719725
}
@@ -750,7 +756,7 @@ func (e *executableImpl) shouldResubmitOnNack(err error) bool {
750756
// this is an optimization for skipping rescheduler and retry the task sooner.
751757
// this is useful for errors like workflow busy, which doesn't have to wait for
752758
// the longer rescheduling backoff.
753-
if e.attempt > resubmitMaxAttempts {
759+
if e.attempt.Load() > resubmitMaxAttempts {
754760
return false
755761
}
756762

@@ -784,14 +790,14 @@ func (e *executableImpl) backoffDuration(
784790
common.IsInternalError(err) {
785791
// using a different reschedule policy to slow down retry
786792
// as immediate retry typically won't resolve the issue.
787-
return taskNotReadyReschedulePolicy.ComputeNextDelay(0, e.attempt, err)
793+
return taskNotReadyReschedulePolicy.ComputeNextDelay(0, int(e.attempt.Load()), err)
788794
}
789795

790796
if err == consts.ErrDependencyTaskNotCompleted {
791-
return dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, e.attempt, err)
797+
return dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, int(e.attempt.Load()), err)
792798
}
793799

794-
backoffDuration := reschedulePolicy.ComputeNextDelay(0, e.attempt, err)
800+
backoffDuration := reschedulePolicy.ComputeNextDelay(0, int(e.attempt.Load()), err)
795801
if !errors.Is(err, consts.ErrResourceExhaustedBusyWorkflow) && common.IsResourceExhausted(err) {
796802
// try a different reschedule policy to slow down retry
797803
// upon system resource exhausted error and pick the longer backoff duration
@@ -805,13 +811,17 @@ func (e *executableImpl) backoffDuration(
805811
}
806812

807813
func (e *executableImpl) incAttempt() {
808-
e.attempt++
814+
attempt := e.attempt.Add(1)
809815

810-
if e.attempt > taskCriticalLogMetricAttempts {
811-
metrics.TaskAttempt.With(e.chasmMetricsHandler).Record(int64(e.attempt))
816+
if attempt > taskCriticalLogMetricAttempts {
817+
metrics.TaskAttempt.With(e.chasmMetricsHandler).Record(attempt)
812818
}
813819
}
814820

821+
func (e *executableImpl) resetAttempt() {
822+
e.attempt.Store(1)
823+
}
824+
815825
func (e *executableImpl) refreshMetricsHandlers(executionMetricTags []metrics.Tag) {
816826
sharedTags := taskBaseMetricTagsWithoutArchetype(
817827
e.GetTask(),

service/history/queues/executable_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)