Skip to content

Commit 056da25

Browse files
authored
Merge branch 'main' into kannan/ephemeral-partition-property
2 parents 884103a + 76beb85 commit 056da25

3 files changed

Lines changed: 55 additions & 4 deletions

File tree

chasm/lib/activity/activity_tasks.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ func (h *startToCloseTimeoutTaskHandler) Validate(
163163
_ chasm.TaskAttributes,
164164
task *activitypb.StartToCloseTimeoutTask,
165165
) (bool, error) {
166-
valid := (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED &&
166+
valid := ((activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED ||
167+
activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED) &&
167168
task.Stamp == activity.LastAttempt.Get(ctx).GetStamp())
168169
return valid, nil
169170
}

common/persistence/sql/sqlplugin/mysql/visibility.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,15 +197,15 @@ func (mdb *db) DeleteFromVisibility(
197197
retError = fmt.Errorf("transaction rollback failed: %w", retError)
198198
}
199199
}()
200-
_, err = mdb.NamedExecContext(ctx, templateDeleteCustomSearchAttributes, filter)
200+
_, err = tx.NamedExecContext(ctx, templateDeleteCustomSearchAttributes, filter)
201201
if err != nil {
202202
return nil, fmt.Errorf("unable to delete custom search attributes: %w", err)
203203
}
204-
result, err = mdb.NamedExecContext(ctx, templateDeleteWorkflowExecution_v8, filter)
204+
result, err = tx.NamedExecContext(ctx, templateDeleteWorkflowExecution_v8, filter)
205205
if err != nil {
206206
return nil, fmt.Errorf("unable to delete workflow execution: %w", err)
207207
}
208-
_, err = mdb.NamedExecContext(ctx, templateDeleteChasmSearchAttributes, filter)
208+
_, err = tx.NamedExecContext(ctx, templateDeleteChasmSearchAttributes, filter)
209209
if err != nil {
210210
return nil, fmt.Errorf("unable to delete chasm search attributes: %w", err)
211211
}

tests/standalone_activity_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2672,6 +2672,56 @@ func (s *standaloneActivityTestSuite) TestStartToCloseTimeout() {
26722672
"expected StartToCloseTimeout but is %s", describeResp3.GetOutcome().GetFailure().GetTimeoutFailureInfo().GetTimeoutType())
26732673
}
26742674

2675+
// TestStartToCloseTimeout_WhileCancelRequested verifies that an activity
2676+
// times out due to start-to-close timeout, after a cancellation request has been accepted.
2677+
func (s *standaloneActivityTestSuite) TestStartToCloseTimeout_WhileCancelRequested() {
2678+
t := s.T()
2679+
ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second)
2680+
defer cancel()
2681+
2682+
activityID := testcore.RandomizeStr(t.Name())
2683+
taskQueue := testcore.RandomizeStr(t.Name())
2684+
2685+
startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{
2686+
Namespace: s.Namespace().String(),
2687+
ActivityId: activityID,
2688+
ActivityType: &commonpb.ActivityType{Name: "test-activity"},
2689+
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
2690+
StartToCloseTimeout: durationpb.New(2 * time.Second),
2691+
RetryPolicy: &commonpb.RetryPolicy{MaximumAttempts: 1},
2692+
})
2693+
require.NoError(t, err)
2694+
2695+
// Worker accepts the task — activity is STARTED.
2696+
_, err = s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
2697+
Namespace: s.Namespace().String(),
2698+
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
2699+
})
2700+
require.NoError(t, err)
2701+
2702+
// Request cancellation — activity moves to CANCEL_REQUESTED.
2703+
_, err = s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{
2704+
Namespace: s.Namespace().String(),
2705+
ActivityId: activityID,
2706+
RunId: startResp.RunId,
2707+
Identity: "canceller",
2708+
RequestId: "cancel-req-1",
2709+
})
2710+
require.NoError(t, err)
2711+
2712+
// Worker ignores cancellation and doesn't respond.
2713+
// The start-to-close timeout (2s) should still fire.
2714+
pollOutcome, err := s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{
2715+
Namespace: s.Namespace().String(),
2716+
ActivityId: activityID,
2717+
RunId: startResp.RunId,
2718+
})
2719+
require.NoError(t, err)
2720+
require.Equal(t, enumspb.TIMEOUT_TYPE_START_TO_CLOSE,
2721+
pollOutcome.GetOutcome().GetFailure().GetTimeoutFailureInfo().GetTimeoutType(),
2722+
"activity in CANCEL_REQUESTED should still time out via START_TO_CLOSE")
2723+
}
2724+
26752725
// TestScheduleToStartTimeout tests that a schedule-to-start timeout is recorded after the activity is
26762726
// created but never started. It also verifies that DescribeActivityExecution can be used to long-poll for a TimedOut
26772727
// state change caused by execution of a timer task.

0 commit comments

Comments
 (0)