Skip to content

Commit 323e85c

Browse files
rkannan82claude
andcommitted
Add WorkerCommandsPartition type and partition metrics tag
When the SDK sets Kind=TASK_QUEUE_KIND_WORKER_COMMANDS, the server now creates a WorkerCommandsPartition (similar to StickyPartition). Metrics for these partitions get partition=__worker_commands__ and the taskqueue tag is suppressed to __omitted__ to prevent cardinality explosion from per-worker queue names. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 526bf30 commit 323e85c

3 files changed

Lines changed: 212 additions & 11 deletions

File tree

common/metrics/task_queues.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import (
77
)
88

99
const (
10-
omitted = "__omitted__"
11-
normal = "__normal__"
12-
sticky = "__sticky__"
10+
omitted = "__omitted__"
11+
normal = "__normal__"
12+
sticky = "__sticky__"
13+
workerCommands = "__worker_commands__"
1314
)
1415

1516
// GetPerTaskQueueFamilyScope returns "namespace" and "taskqueue" tags. "taskqueue" will be "__omitted__" if
@@ -53,15 +54,19 @@ func GetPerTaskQueuePartitionIDScope(
5354
tags ...Tag,
5455
) Handler {
5556
var value string
56-
if partition == nil {
57+
switch p := partition.(type) {
58+
case nil:
5759
value = unknownValue
58-
} else if normalPartition, ok := partition.(*tqid.NormalPartition); ok {
60+
case *tqid.NormalPartition:
5961
if partitionIDBreakdown {
60-
value = strconv.Itoa(normalPartition.PartitionId())
62+
value = strconv.Itoa(p.PartitionId())
6163
} else {
6264
value = normal
6365
}
64-
} else {
66+
case *tqid.WorkerCommandsPartition:
67+
value = workerCommands
68+
taskQueueBreakdown = false
69+
default:
6570
value = sticky
6671
}
6772

@@ -70,7 +75,7 @@ func GetPerTaskQueuePartitionIDScope(
7075
}
7176

7277
// GetPerTaskQueuePartitionTypeScope returns GetPerTaskQueueScope scope plus a "partition" tag which
73-
// can be "__normal__", "__sticky__", or "_unknown_".
78+
// can be "__normal__", "__sticky__", "__worker_commands__", or "_unknown_".
7479
func GetPerTaskQueuePartitionTypeScope(
7580
handler Handler,
7681
namespaceName string,
@@ -79,11 +84,15 @@ func GetPerTaskQueuePartitionTypeScope(
7984
tags ...Tag,
8085
) Handler {
8186
var value string
82-
if partition == nil {
87+
switch partition.(type) {
88+
case nil:
8389
value = unknownValue
84-
} else if _, ok := partition.(*tqid.NormalPartition); ok {
90+
case *tqid.NormalPartition:
8591
value = normal
86-
} else {
92+
case *tqid.WorkerCommandsPartition:
93+
value = workerCommands
94+
taskQueueBreakdown = false
95+
default:
8796
value = sticky
8897
}
8998

common/tqid/task_queue_id.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,14 @@ type (
8888
taskQueue *TaskQueue
8989
}
9090

91+
// WorkerCommandsPartition is used for server-to-worker communication (e.g. activity cancellations).
92+
// These queues are per-worker and only exist for the lifetime of the worker process. The SDK sets
93+
// Kind=TASK_QUEUE_KIND_WORKER_COMMANDS when polling on these queues.
94+
WorkerCommandsPartition struct {
95+
name string
96+
taskQueue *TaskQueue
97+
}
98+
9199
// PartitionKey uniquely identifies a task queue partition, to be used in maps.
92100
// Note that task queue kind (sticky vs normal) and normal name for sticky task queues are not
93101
// part of the task queue partition identity.
@@ -101,6 +109,7 @@ type (
101109

102110
var _ Partition = (*NormalPartition)(nil)
103111
var _ Partition = (*StickyPartition)(nil)
112+
var _ Partition = (*WorkerCommandsPartition)(nil)
104113

105114
var (
106115
ErrNoParent = errors.New("root task queue partition has no parent")
@@ -139,6 +148,9 @@ func UnsafePartitionFromProto(proto *taskqueuepb.TaskQueue, namespaceId string,
139148
case enumspb.TASK_QUEUE_KIND_STICKY:
140149
tq := &TaskQueue{TaskQueueFamily{namespaceId, proto.GetNormalName()}, taskType}
141150
return tq.StickyPartition(proto.GetName())
151+
case enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS:
152+
tq := &TaskQueue{TaskQueueFamily{namespaceId, proto.GetName()}, taskType}
153+
return tq.WorkerCommandsPartition(proto.GetName())
142154
default:
143155
tq := &TaskQueue{TaskQueueFamily{namespaceId, proto.GetName()}, taskType}
144156
return tq.RootPartition()
@@ -164,6 +176,12 @@ func PartitionFromProto(proto *taskqueuepb.TaskQueue, namespaceId string, taskTy
164176
}
165177
tq := &TaskQueue{TaskQueueFamily{namespaceId, normalName}, taskType}
166178
return tq.StickyPartition(baseName), nil
179+
case enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS:
180+
if partition != 0 {
181+
return nil, serviceerror.NewInvalidArgumentf("worker-commands partitions cannot have non-zero partition ID. base name: %s", baseName)
182+
}
183+
tq := &TaskQueue{TaskQueueFamily{namespaceId, baseName}, taskType}
184+
return tq.WorkerCommandsPartition(baseName), nil
167185
default:
168186
tq := &TaskQueue{TaskQueueFamily{namespaceId, baseName}, taskType}
169187
return tq.NormalPartition(partition), nil
@@ -239,6 +257,10 @@ func (n *TaskQueue) StickyPartition(stickyName string) *StickyPartition {
239257
return &StickyPartition{stickyName, n}
240258
}
241259

260+
func (n *TaskQueue) WorkerCommandsPartition(name string) *WorkerCommandsPartition {
261+
return &WorkerCommandsPartition{name, n}
262+
}
263+
242264
func (n *TaskQueue) RootPartition() *NormalPartition {
243265
return n.NormalPartition(0)
244266
}
@@ -300,6 +322,51 @@ func (s *StickyPartition) GradualChangeKey() []byte {
300322
return []byte(key)
301323
}
302324

325+
func (w *WorkerCommandsPartition) TaskType() enumspb.TaskQueueType {
326+
return w.taskQueue.TaskType()
327+
}
328+
329+
func (w *WorkerCommandsPartition) Kind() enumspb.TaskQueueKind {
330+
return enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS
331+
}
332+
333+
func (w *WorkerCommandsPartition) NamespaceId() string {
334+
return w.taskQueue.family.NamespaceId()
335+
}
336+
337+
func (w *WorkerCommandsPartition) TaskQueue() *TaskQueue {
338+
return w.taskQueue
339+
}
340+
341+
func (w *WorkerCommandsPartition) IsRoot() bool {
342+
return false
343+
}
344+
345+
func (w *WorkerCommandsPartition) IsChild() bool {
346+
return false
347+
}
348+
349+
func (w *WorkerCommandsPartition) RpcName() string {
350+
return w.name
351+
}
352+
353+
func (w *WorkerCommandsPartition) Key() PartitionKey {
354+
return PartitionKey{
355+
namespaceId: w.NamespaceId(),
356+
name: w.name,
357+
taskType: w.TaskType(),
358+
}
359+
}
360+
361+
func (w *WorkerCommandsPartition) RoutingKey(int) (string, int) {
362+
return fmt.Sprintf("%s:%s:%d", w.NamespaceId(), w.RpcName(), w.TaskType()), 0
363+
}
364+
365+
func (w *WorkerCommandsPartition) GradualChangeKey() []byte {
366+
key := fmt.Sprintf("%s:%s:%d", w.NamespaceId(), w.RpcName(), w.TaskType())
367+
return []byte(key)
368+
}
369+
303370
func (p *NormalPartition) TaskQueue() *TaskQueue {
304371
return p.taskQueue
305372
}

service/matching/matching_engine_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5232,6 +5232,131 @@ func (*testTaskManager) CountTaskQueuesByBuildId(context.Context, *persistence.C
52325232
return 0, nil
52335233
}
52345234

5235+
// TestLoggerAndMetricsForPartition_BreakdownEnabled verifies partition and taskqueue tag values
5236+
// with the default BreakdownMetricsByTaskQueue=true: normal queues show their name, sticky queues
5237+
// show the base name, and worker-commands queues get partition=__worker_commands__ with
5238+
// taskqueue=__omitted__.
5239+
func TestLoggerAndMetricsForPartition_BreakdownEnabled(t *testing.T) {
5240+
t.Parallel()
5241+
5242+
controller := gomock.NewController(t)
5243+
ns, mockNamespaceCache := createMockNamespaceCache(controller, matchingTestNamespace)
5244+
config := defaultTestConfig()
5245+
e := createTestMatchingEngine(log.NewTestLogger(), controller, config, nil, mockNamespaceCache)
5246+
captureHandler := metricstest.NewCaptureHandler()
5247+
e.metricsHandler = captureHandler
5248+
5249+
tests := []struct {
5250+
name string
5251+
partition tqid.Partition
5252+
expectTQValue string
5253+
expectPartitionTag string
5254+
}{
5255+
{
5256+
name: "normal task queue uses actual queue name",
5257+
partition: newRootPartition(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_NEXUS),
5258+
expectTQValue: "my-task-queue",
5259+
expectPartitionTag: "0",
5260+
},
5261+
{
5262+
name: "worker-commands queue gets __worker_commands__ partition and __omitted__ taskqueue",
5263+
partition: newTestTaskQueue(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_NEXUS).WorkerCommandsPartition("/temporal-sys/worker-commands/ns/key"),
5264+
expectTQValue: "__omitted__",
5265+
expectPartitionTag: "__worker_commands__",
5266+
},
5267+
{
5268+
name: "sticky task queue uses base queue name",
5269+
partition: newTestTaskQueue(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_WORKFLOW).StickyPartition(uuid.NewString()),
5270+
expectTQValue: "my-task-queue",
5271+
expectPartitionTag: "__sticky__",
5272+
},
5273+
}
5274+
5275+
for _, tc := range tests {
5276+
t.Run(tc.name, func(t *testing.T) {
5277+
capture := captureHandler.StartCapture()
5278+
tqConfig := newTaskQueueConfig(tc.partition.TaskQueue(), config, matchingTestNamespace)
5279+
_, _, handler := e.loggerAndMetricsForPartition(ns, tc.partition, tqConfig)
5280+
metrics.PollSuccessPerTaskQueueCounter.With(handler).Record(1)
5281+
snap := capture.Snapshot()
5282+
captureHandler.StopCapture(capture)
5283+
recordings := snap["poll_success"]
5284+
require.NotEmpty(t, recordings, "expected poll_success metric to be recorded")
5285+
found := false
5286+
for _, rec := range recordings {
5287+
if rec.Tags["taskqueue"] == tc.expectTQValue && rec.Tags["partition"] == tc.expectPartitionTag {
5288+
found = true
5289+
}
5290+
}
5291+
assert.True(t, found, "expected taskqueue=%q partition=%q, got: %v", tc.expectTQValue, tc.expectPartitionTag, recordings)
5292+
})
5293+
}
5294+
}
5295+
5296+
// TestLoggerAndMetricsForPartition_BreakdownDisabled verifies behavior with BreakdownMetricsByTaskQueue=false:
5297+
// normal and sticky queues get taskqueue=__omitted__, and worker-commands queues also get __omitted__
5298+
// with partition=__worker_commands__.
5299+
func TestLoggerAndMetricsForPartition_BreakdownDisabled(t *testing.T) {
5300+
t.Parallel()
5301+
5302+
controller := gomock.NewController(t)
5303+
ns, mockNamespaceCache := createMockNamespaceCache(controller, matchingTestNamespace)
5304+
dc := dynamicconfig.StaticClient{
5305+
dynamicconfig.MetricsBreakdownByTaskQueue.Key(): false,
5306+
}
5307+
config := NewConfig(dynamicconfig.NewCollection(dc, log.NewNoopLogger()))
5308+
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueue(100 * time.Millisecond)
5309+
e := createTestMatchingEngine(log.NewTestLogger(), controller, config, nil, mockNamespaceCache)
5310+
captureHandler := metricstest.NewCaptureHandler()
5311+
e.metricsHandler = captureHandler
5312+
5313+
tests := []struct {
5314+
name string
5315+
partition tqid.Partition
5316+
expectTQValue string
5317+
expectPartitionTag string
5318+
}{
5319+
{
5320+
name: "normal task queue is omitted when breakdown disabled",
5321+
partition: newRootPartition(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_NEXUS),
5322+
expectTQValue: "__omitted__",
5323+
expectPartitionTag: "0",
5324+
},
5325+
{
5326+
name: "worker-commands queue gets __worker_commands__ partition when breakdown disabled",
5327+
partition: newTestTaskQueue(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_NEXUS).WorkerCommandsPartition("/temporal-sys/worker-commands/ns/key"),
5328+
expectTQValue: "__omitted__",
5329+
expectPartitionTag: "__worker_commands__",
5330+
},
5331+
{
5332+
name: "sticky task queue is omitted when breakdown disabled",
5333+
partition: newTestTaskQueue(ns.ID().String(), "my-task-queue", enumspb.TASK_QUEUE_TYPE_WORKFLOW).StickyPartition(uuid.NewString()),
5334+
expectTQValue: "__omitted__",
5335+
expectPartitionTag: "__sticky__",
5336+
},
5337+
}
5338+
5339+
for _, tc := range tests {
5340+
t.Run(tc.name, func(t *testing.T) {
5341+
capture := captureHandler.StartCapture()
5342+
tqConfig := newTaskQueueConfig(tc.partition.TaskQueue(), config, matchingTestNamespace)
5343+
_, _, handler := e.loggerAndMetricsForPartition(ns, tc.partition, tqConfig)
5344+
metrics.PollSuccessPerTaskQueueCounter.With(handler).Record(1)
5345+
snap := capture.Snapshot()
5346+
captureHandler.StopCapture(capture)
5347+
recordings := snap["poll_success"]
5348+
require.NotEmpty(t, recordings, "expected poll_success metric to be recorded")
5349+
found := false
5350+
for _, rec := range recordings {
5351+
if rec.Tags["taskqueue"] == tc.expectTQValue && rec.Tags["partition"] == tc.expectPartitionTag {
5352+
found = true
5353+
}
5354+
}
5355+
assert.True(t, found, "expected taskqueue=%q partition=%q, got: %v", tc.expectTQValue, tc.expectPartitionTag, recordings)
5356+
})
5357+
}
5358+
}
5359+
52355360
func TestConvertPollWorkflowTaskQueueResponse(t *testing.T) {
52365361
t.Parallel()
52375362

0 commit comments

Comments
 (0)