Skip to content

Commit dd32c8f

Browse files
rkannan82claude
andauthored
Retry ForwardPoll on ResourceExhausted instead of disabling forwarding (#10019)
## What On child partitions, when `ForwardPoll` gets a `ResourceExhausted` error, re-enqueue the poller with forwarding still enabled so it retries. ## Why When `frontend.enableCancelWorkerPollsOnShutdown` is enabled, a wave of cancelled polls followed by re-polls can trigger rate limiting on the root partition. The `forwardPolls` goroutine was treating `ResourceExhausted` the same as other errors — permanently disabling forwarding by setting `forwardCtx = nil`. This caused the poll to fall back to waiting for a local task match, which on a child partition with no backlog means waiting the full 60s long-poll timeout before the poller retries. The `forwardTasks` goroutine already had proper `ResourceExhausted` retry logic with exponential backoff; `forwardPolls` was missing it. ## How did you test it? Unit test (`TestForwardPollRetriesOnResourceExhausted`) that sets up a child partition with a mock matching client returning `ResourceExhausted` on the first `ForwardPoll` call and a valid task on the second, then verifies the poll succeeds via retry. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 481bafc commit dd32c8f

4 files changed

Lines changed: 114 additions & 5 deletions

File tree

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,6 +1450,11 @@ second per poller by one physical queue manager`,
14501450
60*time.Second,
14511451
`Timeout for forwarded backlog task (requires new matcher)`,
14521452
)
1453+
MatchingForwardPollRetryMaxInterval = NewTaskQueueDurationSetting(
1454+
"matching.forwardPollRetryMaxInterval",
1455+
10*time.Second,
1456+
`Max backoff interval when retrying a rate-limited ForwardPoll from a child partition`,
1457+
)
14531458
MatchingFairnessCounter = NewTaskQueueTypedSetting(
14541459
"matching.fairnessCounter",
14551460
counter.DefaultCounterParams,

service/matching/config.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,12 @@ type (
9797
MaxFairnessKeyWeightOverrides dynamicconfig.IntPropertyFnWithTaskQueueFilter
9898

9999
// Time to hold a poll request before returning an empty response if there are no tasks
100-
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskQueueFilter
101-
BacklogTaskForwardTimeout dynamicconfig.DurationPropertyFnWithTaskQueueFilter
102-
MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskQueueFilter
103-
MaxTaskDeleteBatchSize dynamicconfig.IntPropertyFnWithTaskQueueFilter
104-
TaskDeleteInterval dynamicconfig.DurationPropertyFnWithTaskQueueFilter
100+
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskQueueFilter
101+
BacklogTaskForwardTimeout dynamicconfig.DurationPropertyFnWithTaskQueueFilter
102+
ForwardPollRetryMaxInterval dynamicconfig.DurationPropertyFnWithTaskQueueFilter
103+
MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskQueueFilter
104+
MaxTaskDeleteBatchSize dynamicconfig.IntPropertyFnWithTaskQueueFilter
105+
TaskDeleteInterval dynamicconfig.DurationPropertyFnWithTaskQueueFilter
105106

106107
// taskWriter configuration
107108
OutstandingTaskAppendsThreshold dynamicconfig.IntPropertyFnWithTaskQueueFilter
@@ -156,6 +157,7 @@ type (
156157
// Time to hold a poll request before returning an empty response if there are no tasks
157158
LongPollExpirationInterval func() time.Duration
158159
BacklogTaskForwardTimeout func() time.Duration
160+
ForwardPollRetryMaxInterval func() time.Duration
159161
RangeSize int64
160162
NewMatcher bool
161163
NewMatcherSub func(func(dynamicconfig.GradualChange[bool])) (dynamicconfig.GradualChange[bool], func())
@@ -285,6 +287,7 @@ func NewConfig(
285287
MaxTaskQueueIdleTime: dynamicconfig.MatchingMaxTaskQueueIdleTime.Get(dc),
286288
LongPollExpirationInterval: dynamicconfig.MatchingLongPollExpirationInterval.Get(dc),
287289
BacklogTaskForwardTimeout: dynamicconfig.MatchingBacklogTaskForwardTimeout.Get(dc),
290+
ForwardPollRetryMaxInterval: dynamicconfig.MatchingForwardPollRetryMaxInterval.Get(dc),
288291
MinTaskThrottlingBurstSize: dynamicconfig.MatchingMinTaskThrottlingBurstSize.Get(dc),
289292
MaxTaskDeleteBatchSize: dynamicconfig.MatchingMaxTaskDeleteBatchSize.Get(dc),
290293
TaskDeleteInterval: dynamicconfig.MatchingTaskDeleteInterval.Get(dc),
@@ -439,6 +442,9 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
439442
BacklogTaskForwardTimeout: func() time.Duration {
440443
return config.BacklogTaskForwardTimeout(ns.String(), taskQueueName, taskType)
441444
},
445+
ForwardPollRetryMaxInterval: func() time.Duration {
446+
return config.ForwardPollRetryMaxInterval(ns.String(), taskQueueName, taskType)
447+
},
442448
MaxTaskDeleteBatchSize: func() int {
443449
return config.MaxTaskDeleteBatchSize(ns.String(), taskQueueName, taskType)
444450
},

service/matching/pri_matcher.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,10 @@ func (tm *priTaskMatcher) forwardPolls(
293293
ft pollForwarderType,
294294
target *tqid.NormalPartition,
295295
) {
296+
policy := backoff.NewExponentialRetryPolicy(time.Second).
297+
WithMaximumInterval(tm.config.ForwardPollRetryMaxInterval()).
298+
WithExpirationInterval(backoff.NoInterval)
299+
retrier := backoff.NewRetrier(policy, clock.NewRealTimeSource())
296300
forwarderTask := newPollForwarderTask(effectivePriority, ft)
297301
ctxs := []context.Context{ctx} // ctx should be equal to or child of tm.tqCtx
298302
for ctx.Err() == nil {
@@ -332,6 +336,7 @@ func (tm *priTaskMatcher) forwardPolls(
332336
_ = stop()
333337
if err == nil {
334338
tm.data.FinishMatchAfterPollForward(poller, task)
339+
retrier.Reset()
335340
if ft == priorityBacklogPollForwarder {
336341
metrics.PriorityBacklogForwardedPerTaskQueueCounter.With(tm.metricsHandler).Record(1)
337342
}
@@ -343,6 +348,10 @@ func (tm *priTaskMatcher) forwardPolls(
343348
// 4× to allow for a few rounds plus propagation.
344349
interval := cmp.Or(tm.config.EphemeralDataUpdateInterval(), time.Minute)
345350
_ = util.InterruptibleSleep(ctx, 4*interval)
351+
} else if common.IsResourceExhausted(err) {
352+
// Rate limited: re-enqueue with forwarding still enabled so it retries.
353+
tm.data.ReenqueuePollerIfNotMatched(poller)
354+
_ = util.InterruptibleSleep(ctx, retrier.NextBackOff(err))
346355
} else {
347356
// Re-enqueue to let it match again, if it hasn't gotten a context timeout already.
348357
poller.forwardCtx = nil // disable forwarding next time

service/matching/pri_matcher_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@ import (
44
"context"
55
"sync/atomic"
66
"testing"
7+
"testing/synctest"
78
"time"
89

10+
"github.com/stretchr/testify/require"
911
"github.com/stretchr/testify/suite"
1012
enumspb "go.temporal.io/api/enums/v1"
13+
"go.temporal.io/api/serviceerror"
14+
"go.temporal.io/server/api/matchingservice/v1"
15+
"go.temporal.io/server/api/matchingservicemock/v1"
1116
persistencespb "go.temporal.io/server/api/persistence/v1"
1217
"go.temporal.io/server/common/dynamicconfig"
1318
"go.temporal.io/server/common/log"
1419
"go.temporal.io/server/common/metrics"
20+
"go.temporal.io/server/common/testing/testhooks"
1521
"go.temporal.io/server/common/testing/testlogger"
1622
"go.temporal.io/server/common/tqid"
1723
"go.uber.org/mock/gomock"
@@ -103,3 +109,86 @@ func (s *PriMatcherSuite) TestValidatorWorksOnRoot() {
103109

104110
s.True(validatorValidatedTask.Load(), "Validator should have called maybeValidate")
105111
}
112+
113+
// TestForwardPollRetriesOnResourceExhausted verifies that when a child partition's
114+
// ForwardPoll gets a ResourceExhausted error (rate limited), the poller is re-enqueued
115+
// with forwarding still enabled and retries until it succeeds. This is a regression test
116+
// for a bug where ForwardPoll permanently disabled forwarding on transient rate-limit
117+
// errors, causing polls to wait for the full 60s timeout instead of retrying.
118+
func (s *PriMatcherSuite) TestForwardPollRetriesOnResourceExhausted() {
119+
// Use synctest to virtualize time so the backoff sleep is instant.
120+
synctest.Test(s.T(), func(t *testing.T) {
121+
ctx, cancel := context.WithCancel(context.Background())
122+
defer cancel()
123+
124+
tq := tqid.UnsafeTaskQueueFamily("nsid", "tq").TaskQueue(enumspb.TASK_QUEUE_TYPE_WORKFLOW)
125+
childPartition := tq.NormalPartition(1) // child partition /1
126+
127+
cfg := newTaskQueueConfig(tq, NewConfig(dynamicconfig.NewNoopCollection()), "nsname")
128+
// Use a generous poll timeout so we can distinguish retry success from timeout.
129+
cfg.LongPollExpirationInterval = func() time.Duration { return 10 * time.Second }
130+
131+
mockClient := matchingservicemock.NewMockMatchingServiceClient(s.controller)
132+
133+
// First ForwardPoll call: return ResourceExhausted (simulating rate limit storm).
134+
// Second call: return a valid task response.
135+
rateLimitErr := serviceerror.NewResourceExhausted(
136+
enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "rate limit exceeded",
137+
)
138+
taskToken := []byte("test-task-token")
139+
140+
gomock.InOrder(
141+
mockClient.EXPECT().
142+
PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).
143+
Return(nil, rateLimitErr),
144+
mockClient.EXPECT().
145+
PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).
146+
Return(&matchingservice.PollWorkflowTaskQueueResponse{
147+
TaskToken: taskToken,
148+
}, nil),
149+
)
150+
151+
// Create a priForwarder for the child partition (non-nil fwdr triggers child behavior).
152+
queue := UnversionedQueueKey(childPartition)
153+
fwdr, err := newPriForwarder(
154+
&cfg.forwarderConfig,
155+
queue,
156+
mockClient,
157+
testhooks.TestHooks{},
158+
)
159+
require.NoError(t, err)
160+
161+
rateLimitManager := newRateLimitManager(&mockUserDataManager{}, cfg, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
162+
163+
tm := newPriTaskMatcher(
164+
ctx,
165+
cfg,
166+
childPartition,
167+
fwdr,
168+
mockClient,
169+
nil, // no validator needed on child
170+
s.logger,
171+
metrics.NoopMetricsHandler,
172+
rateLimitManager,
173+
func() {},
174+
)
175+
176+
tm.Start()
177+
defer tm.Stop()
178+
179+
// Poll from the child partition. The forwardPolls goroutine should:
180+
// 1. Pick up this poller
181+
// 2. Try ForwardPoll → get ResourceExhausted
182+
// 3. Re-enqueue poller with forwarding still enabled
183+
// 4. Try ForwardPoll again → succeed with task token
184+
// 5. Return the task to the poller
185+
pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second)
186+
defer pollCancel()
187+
188+
task, err := tm.Poll(pollCtx, &pollMetadata{})
189+
require.NoError(t, err)
190+
require.NotNil(t, task, "poll should have received a task via forwarding retry")
191+
require.True(t, task.isStarted(), "task should be a started (forwarded) task")
192+
require.Equal(t, taskToken, task.started.workflowTaskInfo.TaskToken)
193+
})
194+
}

0 commit comments

Comments
 (0)