Skip to content

Commit 8e2ff4f

Browse files
rkannan82claude
andcommitted
Rename stickyTaskQueueTTL and stickyWorkerAvailable to ephemeral variants
These are used in IsEphemeral() code paths that apply to all ephemeral partition types, not just sticky queues. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 056da25 commit 8e2ff4f

2 files changed

Lines changed: 20 additions & 13 deletions

File tree

service/matching/db.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
)
2222

2323
const (
24-
initialRangeID = 1 // Id of the first range of a new task queue
25-
stickyTaskQueueTTL = 24 * time.Hour
24+
initialRangeID = 1 // Id of the first range of a new task queue
25+
ephemeralTaskQueueTTL = 24 * time.Hour
2626

2727
// Subqueue zero corresponds to "the queue" before migrating metadata to subqueues.
2828
// For backwards compatibility, some operations only apply to subqueue zero for now.
@@ -283,10 +283,10 @@ func (db *taskQueueDB) SyncState(ctx context.Context) error {
283283
defer db.Unlock()
284284
defer db.emitPhysicalBacklogGaugesLocked()
285285

286-
// We only need to write if something changed, or if we're past half of the sticky queue TTL.
287-
// Note that we use the same threshold for non-sticky queues even though they don't have a
286+
// We only need to write if something changed, or if we're past half of the ephemeral queue TTL.
287+
// Note that we use the same threshold for non-ephemeral queues even though they don't have a
288288
// persistence TTL, since the scavenger looks for metadata that hasn't been updated in 48 hours.
289-
needWrite := db.lastChange.After(db.lastWrite) || time.Since(db.lastWrite) > stickyTaskQueueTTL/2
289+
needWrite := db.lastChange.After(db.lastWrite) || time.Since(db.lastWrite) > ephemeralTaskQueueTTL/2
290290
if !needWrite {
291291
return nil
292292
}
@@ -732,7 +732,7 @@ func (db *taskQueueDB) AllocateSubqueue(
732732

733733
func (db *taskQueueDB) expiryTime() *timestamppb.Timestamp {
734734
if db.queue.Partition().IsEphemeral() {
735-
return timestamppb.New(time.Now().Add(stickyTaskQueueTTL))
735+
return timestamppb.New(time.Now().Add(ephemeralTaskQueueTTL))
736736
}
737737
return nil
738738
}

service/matching/matching_engine.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import (
5353
"go.temporal.io/server/common/resource"
5454
"go.temporal.io/server/common/searchattribute"
5555
serviceerrors "go.temporal.io/server/common/serviceerror"
56+
"go.temporal.io/server/common/softassert"
5657
"go.temporal.io/server/common/stream_batcher"
5758
"go.temporal.io/server/common/taskqueue"
5859
"go.temporal.io/server/common/tasktoken"
@@ -67,7 +68,7 @@ import (
6768
)
6869

6970
const (
70-
// If sticky poller is not seem in last 10s, we treat it as sticky worker unavailable
71+
// If sticky poller is not seen in last 10s, we treat it as sticky worker unavailable.
7172
// This seems aggressive, but the default sticky schedule_to_start timeout is 5s, so 10s seems reasonable.
7273
stickyPollerUnavailableWindow = 10 * time.Second
7374

@@ -570,13 +571,16 @@ func (e *matchingEngineImpl) AddWorkflowTask(
570571
if err != nil {
571572
return "", false, err
572573
}
574+
if !softassert.That(e.logger, partition.Kind() == enumspb.TASK_QUEUE_KIND_NORMAL || partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY, //nolint:forbidigo
575+
"AddWorkflowTask called with unexpected partition kind") {
576+
return "", false, serviceerror.NewInternal("AddWorkflowTask called with unexpected partition kind")
577+
}
573578

574-
ephemeral := partition.IsEphemeral()
575579
// do not load ephemeral task queue if it is not already loaded, which means it has no poller.
576-
pm, _, err := e.getTaskQueuePartitionManager(ctx, partition, !ephemeral, loadCauseTask)
580+
pm, _, err := e.getTaskQueuePartitionManager(ctx, partition, !partition.IsEphemeral(), loadCauseTask)
577581
if err != nil {
578582
return "", false, err
579-
} else if ephemeral && !stickyWorkerAvailable(pm) {
583+
} else if partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY && !stickyWorkerAvailable(pm) {
580584
return "", false, serviceerrors.NewStickyWorkerUnavailable()
581585
}
582586

@@ -1089,12 +1093,15 @@ func (e *matchingEngineImpl) QueryWorkflow(
10891093
if err != nil {
10901094
return nil, err
10911095
}
1092-
ephemeral := partition.IsEphemeral()
1096+
if !softassert.That(e.logger, partition.Kind() == enumspb.TASK_QUEUE_KIND_NORMAL || partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY, //nolint:forbidigo
1097+
"QueryWorkflow called with unexpected partition kind") {
1098+
return nil, serviceerror.NewInternal("QueryWorkflow called with unexpected partition kind")
1099+
}
10931100
// do not load ephemeral task queue if it is not already loaded, which means it has no poller.
1094-
pm, _, err := e.getTaskQueuePartitionManager(ctx, partition, !ephemeral, loadCauseQuery)
1101+
pm, _, err := e.getTaskQueuePartitionManager(ctx, partition, !partition.IsEphemeral(), loadCauseQuery)
10951102
if err != nil {
10961103
return nil, err
1097-
} else if ephemeral && !stickyWorkerAvailable(pm) {
1104+
} else if partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY && !stickyWorkerAvailable(pm) {
10981105
return nil, serviceerrors.NewStickyWorkerUnavailable()
10991106
}
11001107

0 commit comments

Comments
 (0)