Skip to content

Commit 8436aed

Browse files
authored
Allow config.ReplicationLowPriorityTaskParallelism to take effect. (#10051)
## What changed? Use `WorkflowKey` for the low and high priority replication scheduler's queue ID rather than a string. Deterministically map RunID to a slot number for low priority using `config.ReplicationLowPriorityTaskParallelism` slots. ## Why? Using strings for the queue ID was preventing concurrent tx map from optimising queue operations. While we had code to allow parallelising tasks for different executions of the same workflow ID, the code was not reachable. Switch the parallelisation to the queue ID level which is subjectively more intuitive. ## Potential risks The `config.ReplicationLowPriorityTaskParallelism` could be set to a non-default number on clusters but would not currently be taking effect. After this change the parallelism would kick in, which may cause surprising behaviour.
1 parent 4a15166 commit 8436aed

1 file changed

Lines changed: 17 additions & 12 deletions

File tree

  • service/history/replication

service/history/replication/fx.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package replication
22

33
import (
44
"context"
5-
"math/rand"
65
"strconv"
76

87
"github.com/dgryski/go-farm"
@@ -181,22 +180,26 @@ func replicationStreamLowPrioritySchedulerProvider(
181180
metricsHandler metrics.Handler,
182181
lc fx.Lifecycle,
183182
) ctasks.Scheduler[TrackableExecutableTask] {
183+
// P-way parallelism for executions of the same workflow (per ReplicationLowPriorityTaskParallelism)
184+
// is modeled as P distinct per-namespace-workflow queue IDs. We bucket by execution (RunID) so all
185+
// low-priority tasks for one execution share a queue; the third field stores the slot index, not
186+
// the run UUID.
184187
queueFactory := func(task TrackableExecutableTask) ctasks.SequentialTaskQueue[TrackableExecutableTask] {
185188
item := task.QueueID()
186189
workflowKey, ok := item.(definition.WorkflowKey)
187190
if !ok {
188191
return NewSequentialTaskQueueWithID(item)
189192
}
190-
return NewSequentialTaskQueueWithID(workflowKey.NamespaceID + "_" + workflowKey.WorkflowID)
191-
}
192-
taskQueueHashFunc := func(item any) uint32 {
193-
workflowKey, ok := item.(definition.WorkflowKey)
194-
if !ok {
195-
return 0
193+
parallelism := config.ReplicationLowPriorityTaskParallelism()
194+
if parallelism < 1 {
195+
parallelism = 1
196196
}
197-
198-
idBytes := []byte(workflowKey.NamespaceID + "_" + workflowKey.WorkflowID + "_" + strconv.Itoa(rand.Intn(config.ReplicationLowPriorityTaskParallelism())))
199-
return farm.Fingerprint32(idBytes)
197+
// 0..parallelism-1, stable for a given RunID. Different runs of the same workflow can share
198+
// a slot, so up to P sequential queues (and workers) can progress them concurrently.
199+
slot := int(farm.Fingerprint32([]byte(workflowKey.RunID)) % uint32(parallelism))
200+
return NewSequentialTaskQueueWithID(
201+
definition.NewWorkflowKey(workflowKey.NamespaceID, workflowKey.WorkflowID, strconv.Itoa(slot)),
202+
)
200203
}
201204
// SequentialScheduler has panic wrapper when executing task,
202205
// if changing the executor, please make sure other executor has panic wrapper
@@ -205,7 +208,7 @@ func replicationStreamLowPrioritySchedulerProvider(
205208
QueueSize: config.ReplicationProcessorSchedulerQueueSize(),
206209
WorkerCount: config.ReplicationLowPriorityProcessorSchedulerWorkerCount,
207210
},
208-
taskQueueHashFunc,
211+
WorkflowKeyHashFn,
209212
queueFactory,
210213
logger,
211214
)
@@ -303,7 +306,9 @@ func sequentialTaskQueueFactoryProvider(
303306
if !ok {
304307
return NewSequentialTaskQueueWithID(item)
305308
}
306-
return NewSequentialTaskQueueWithID(workflowKey.NamespaceID + "_" + workflowKey.WorkflowID)
309+
return NewSequentialTaskQueueWithID(
310+
definition.NewWorkflowKey(workflowKey.NamespaceID, workflowKey.WorkflowID, ""),
311+
)
307312
}
308313
}
309314

0 commit comments

Comments
 (0)