Skip to content

Commit de79e50

Browse files
Shivs11claude
andcommitted
Add integration test and point go.mod to api-go branch
TestStalePartition_RevisionSuppressesTrampolining verifies that when a stale matching partition sends an outdated target version with a low revision number, the revision-based comparison in case 4 of the target version change switch suppresses the signal. Also verifies that a genuinely new version with a higher revision correctly fires the signal. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 54474af commit de79e50

3 files changed

Lines changed: 148 additions & 4 deletions

File tree

go.mod

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ require (
5959
go.opentelemetry.io/otel/sdk v1.40.0
6060
go.opentelemetry.io/otel/sdk/metric v1.40.0
6161
go.opentelemetry.io/otel/trace v1.40.0
62-
go.temporal.io/api v1.62.8-0.20260406230818-5423d0dd678a
62+
go.temporal.io/api v1.62.3-0.20260409181818-debaf76d420a
6363
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2
6464
go.temporal.io/sdk v1.41.1
6565
go.uber.org/fx v1.24.0
@@ -157,7 +157,7 @@ require (
157157
github.com/go-logr/logr v1.4.3 // indirect
158158
github.com/go-logr/stdr v1.2.2 // indirect
159159
github.com/gogo/protobuf v1.3.2 // indirect
160-
github.com/golang/mock v1.6.0 // indirect
160+
github.com/golang/mock v1.6.0
161161
github.com/golang/snappy v0.0.4 // indirect
162162
github.com/google/s2a-go v0.1.9 // indirect
163163
github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect
@@ -216,5 +216,3 @@ require (
216216
modernc.org/mathutil v1.7.1 // indirect
217217
modernc.org/memory v1.11.0 // indirect
218218
)
219-
220-
replace go.temporal.io/api => /Users/shivamsaraf/Desktop/work/api-go

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,8 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY
440440
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
441441
go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOVAtj4=
442442
go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE=
443+
go.temporal.io/api v1.62.3-0.20260409181818-debaf76d420a h1:Dya7I98BYaFduHgPOc22M4zCyWFCx5DzLeGOzeiwblQ=
444+
go.temporal.io/api v1.62.3-0.20260409181818-debaf76d420a/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
443445
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ=
444446
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50=
445447
go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA=

tests/versioning_3_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5900,6 +5900,150 @@ func (s *Versioning3Suite) TestRemoveOverride_ClearsDeclinedState() {
59005900
})
59015901
}
59025902

5903+
// TestStalePartition_RevisionSuppressesTrampolining verifies that when a stale
5904+
// matching partition sends an outdated target version, the revision-based
5905+
// comparison in case 4 suppresses the signal and prevents trampolining.
5906+
//
5907+
// Flow:
5908+
// 1. Start pinned workflow on v1, set v1 as current
5909+
// 2. Set v2 as current (revision increments)
5910+
// 3. Set v3 as current (revision increments again)
5911+
// 4. Trigger WFT → signal fires (target=v3 at high revision), CaN with decline
5912+
// 5. Roll back task queue to v2 with revision 0 (simulating stale partition)
5913+
// 6. Trigger WFT → assert targetDeploymentVersionChanged=false (revision 0 <= declined revision)
5914+
// 7. Set v4 as current (fresh version with higher revision)
5915+
// 8. Trigger WFT → assert targetDeploymentVersionChanged=true (new revision > declined revision)
5916+
func (s *Versioning3Suite) TestStalePartition_RevisionSuppressesTrampolining() {
5917+
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
5918+
s.OverrideDynamicConfig(dynamicconfig.MatchingNumTaskqueueWritePartitions, 1)
5919+
5920+
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
5921+
defer cancel()
5922+
5923+
tv1 := testvars.New(s).WithBuildIDNumber(1)
5924+
tv2 := tv1.WithBuildIDNumber(2)
5925+
tv3 := tv1.WithBuildIDNumber(3)
5926+
5927+
// Start async poller for v1 that handles first WFT and declares pinned behavior
5928+
wftCompleted := make(chan struct{})
5929+
s.pollWftAndHandle(tv1, false, wftCompleted,
5930+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
5931+
s.NotNil(task)
5932+
return respondEmptyWft(tv1, false, vbPinned), nil
5933+
})
5934+
5935+
// Register v1 and set it as current
5936+
s.waitForDeploymentDataPropagation(tv1, versionStatusInactive, false, tqTypeWf)
5937+
s.setCurrentDeployment(tv1)
5938+
5939+
// Start workflow — first WFT handled by async poller (pinned on v1)
5940+
runID := s.startWorkflow(tv1, nil)
5941+
execution := tv1.WithRunID(runID).WorkflowExecution()
5942+
s.WaitForChannel(ctx, wftCompleted)
5943+
s.verifyWorkflowVersioning(s.Assertions, tv1, vbPinned, tv1.Deployment(), nil, nil)
5944+
5945+
// Register v2, set v2 as current (revision increments)
5946+
s.idlePollWorkflow(ctx, tv2, true, ver3MinPollTime, "v2 poller registration")
5947+
s.setCurrentDeployment(tv2)
5948+
5949+
// Register v3, set v3 as current (revision increments again)
5950+
s.idlePollWorkflow(ctx, tv3, true, ver3MinPollTime, "v3 poller registration")
5951+
s.setCurrentDeployment(tv3)
5952+
5953+
// Trigger WFT — target should be v3 with a high revision
5954+
s.triggerNormalWFT(ctx, tv1, execution)
5955+
5956+
// Process: targetDeploymentVersionChanged=true → CaN without AU (decline v3)
5957+
s.pollWftAndHandle(tv1, false, nil,
5958+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
5959+
s.NotNil(task)
5960+
var lastStarted *historypb.HistoryEvent
5961+
for _, event := range task.History.GetEvents() {
5962+
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
5963+
lastStarted = event
5964+
}
5965+
}
5966+
s.NotNil(lastStarted)
5967+
s.True(lastStarted.GetWorkflowTaskStartedEventAttributes().GetTargetWorkerDeploymentVersionChanged(),
5968+
"expected true after v3 becomes current")
5969+
5970+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
5971+
Commands: []*commandpb.Command{
5972+
{
5973+
CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION,
5974+
Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{
5975+
ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{
5976+
WorkflowType: tv1.WorkflowType(),
5977+
TaskQueue: tv1.TaskQueue(),
5978+
Input: tv1.Any().Payloads(),
5979+
},
5980+
},
5981+
},
5982+
},
5983+
VersioningBehavior: vbPinned,
5984+
DeploymentOptions: tv1.WorkerDeploymentOptions(true),
5985+
}, nil
5986+
})
5987+
5988+
// CaN run: first WFT — declined=v3 propagated from previous run
5989+
s.pollWftAndHandle(tv1, false, nil,
5990+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
5991+
s.NotNil(task)
5992+
s.NotEqual(execution.RunId, task.WorkflowExecution.RunId,
5993+
"CaN should have created a new run")
5994+
execution = task.WorkflowExecution
5995+
return respondEmptyWft(tv1, false, vbPinned), nil
5996+
})
5997+
5998+
// Simulate stale partition: roll back task queue to v2 with revision 0
5999+
s.rollbackTaskQueueToVersion(tv2)
6000+
6001+
// Trigger WFT with stale data — target is now v2 at revision 0
6002+
s.triggerNormalWFT(ctx, tv1, execution)
6003+
6004+
// Assert: revision 0 <= declined revision → signal suppressed
6005+
s.pollWftAndHandle(tv1, false, nil,
6006+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
6007+
s.NotNil(task)
6008+
var lastStarted *historypb.HistoryEvent
6009+
for _, event := range task.History.GetEvents() {
6010+
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
6011+
lastStarted = event
6012+
}
6013+
}
6014+
s.NotNil(lastStarted)
6015+
s.False(lastStarted.GetWorkflowTaskStartedEventAttributes().GetTargetWorkerDeploymentVersionChanged(),
6016+
"stale partition (revision 0) should be suppressed by declined revision")
6017+
return respondEmptyWft(tv1, false, vbPinned), nil
6018+
})
6019+
6020+
// Set a new v4 as current — this produces a revision strictly higher than
6021+
// the declined revision, simulating an up-to-date partition with fresh data.
6022+
tv4 := tv1.WithBuildIDNumber(4)
6023+
s.idlePollWorkflow(ctx, tv4, true, ver3MinPollTime, "v4 poller registration")
6024+
s.setCurrentDeployment(tv4)
6025+
s.waitForDeploymentDataPropagation(tv4, versionStatusCurrent, false, tqTypeWf)
6026+
6027+
// Trigger WFT with fresh data — target is v4 at higher revision
6028+
s.triggerNormalWFT(ctx, tv1, execution)
6029+
6030+
// Assert: new revision > declined revision → signal fires
6031+
s.pollWftAndHandle(tv1, false, nil,
6032+
func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
6033+
s.NotNil(task)
6034+
var lastStarted *historypb.HistoryEvent
6035+
for _, event := range task.History.GetEvents() {
6036+
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
6037+
lastStarted = event
6038+
}
6039+
}
6040+
s.NotNil(lastStarted)
6041+
s.True(lastStarted.GetWorkflowTaskStartedEventAttributes().GetTargetWorkerDeploymentVersionChanged(),
6042+
"up-to-date partition with higher revision should fire signal")
6043+
return respondCompleteWorkflow(tv1, vbPinned), nil
6044+
})
6045+
}
6046+
59036047
// TestRetryOfDeclinedCaN_SignalsOnNewTarget verifies that when a CaN'd run
59046048
// ,which declined to upgrade, fails and is retried by the server, the retry
59056049
// run inherits NotificationSuppressedTargetVersion from the original CaN

0 commit comments

Comments
 (0)