Skip to content

Commit 0845a2a

Browse files
authored
implement Pause/UnpauseActivityExecution for standalone activities (#9851)
## What changed? Implement `PauseActivityExecution` and `UnpauseActivityExecution` for standalone activities. Previously both handlers returned Unimplemented for the SAA path. They now use chasm.UpdateComponent to apply pause/unpause state directly to the CHASM Activity component, matching the semantics of the existing workflow-activity implementation. - Proto (`activity_state.proto`): Added `ActivityPauseState` message (`pause_time`, `identity`, `reason`) and a `pause_state` field on `ActivityState`. - `handlePauseRequested`: Sets `PauseState` on the component. If the activity is in `SCHEDULED` state, increments the attempt stamp so the existing `ActivityDispatchTask` is invalidated — preventing the activity from being dispatched to a worker while paused. For `STARTED` activities the stamp is left unchanged; the worker retains a valid token and receives `ActivityPaused: true` on its next heartbeat. - `handleUnpauseRequested`: Clears `PauseState`, optionally resets the attempt count and/or heartbeat details, and if the activity is `SCHEDULED` bumps the stamp and enqueues a new `ActivityDispatchTask` with optional jitter. - `RecordHeartbeat`: Wires up the `ActivityPaused` response field. - `buildActivityExecutionInfo`: Maps pause state to `PENDING_ACTIVITY_STATE_PAUSED` (activity is scheduled but not running) or `PENDING_ACTIVITY_STATE_PAUSE_REQUESTED` (activity is running on the worker) in the `RunState` field of `DescribeActivityExecution`. ## Why? `PauseActivityExecution` / `UnpauseActivityExecution` were already implemented for workflow-embedded activities via the history service. Standalone activities had stub handlers that returned `Unimplemented`, this brings SAA to feature parity with workflow activities for the pause/unpause lifecycle operations. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [X] added new functional test(s) ## Potential risks Minimal, this is a translation of an existing api (Pause/UnpauseActivity)
1 parent 0cbb52d commit 0845a2a

12 files changed

Lines changed: 2225 additions & 108 deletions

File tree

chasm/lib/activity/activity.go

Lines changed: 175 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package activity
33
import (
44
"errors"
55
"fmt"
6+
"math/rand"
67
"slices"
78
"time"
89

@@ -93,6 +94,19 @@ type RespondCancelledEvent struct {
9394
Token *tokenspb.Task
9495
}
9596

97+
func (a *Activity) isTerminal() bool {
98+
switch a.GetStatus() {
99+
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED,
100+
activitypb.ACTIVITY_EXECUTION_STATUS_FAILED,
101+
activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED,
102+
activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED,
103+
activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT:
104+
return true
105+
default:
106+
return false
107+
}
108+
}
109+
96110
// LifecycleState implements the chasm.Component interface.
97111
func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
98112
switch a.Status {
@@ -590,8 +604,11 @@ func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, request
590604
return &activitypb.RequestCancelActivityExecutionResponse{}, nil
591605
}
592606

593-
// If in scheduled state, cancel immediately right after marking cancel requested
594-
isCancelImmediately := a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED
607+
// SCHEDULED and PAUSED activities have no active worker token so cancel immediately.
608+
// STARTED and CANCEL_REQUESTED activities wait for the worker to respond.
609+
originalStatus := a.GetStatus()
610+
isCancelImmediately := originalStatus == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED ||
611+
originalStatus == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED
595612

596613
if err := TransitionCancelRequested.Apply(a, ctx, req); err != nil {
597614
return nil, err
@@ -611,7 +628,7 @@ func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, request
611628
err = TransitionCanceled.Apply(a, ctx, cancelEvent{
612629
details: details,
613630
handler: metricsHandler,
614-
fromStatus: activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, // if we're here the original status was scheduled
631+
fromStatus: originalStatus,
615632
})
616633
if err != nil {
617634
return nil, err
@@ -621,6 +638,125 @@ func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, request
621638
return &activitypb.RequestCancelActivityExecutionResponse{}, nil
622639
}
623640

641+
func (a *Activity) handlePauseRequested(ctx chasm.MutableContext, req *activitypb.PauseActivityExecutionRequest) (
642+
*activitypb.PauseActivityExecutionResponse, error,
643+
) {
644+
if a.isTerminal() {
645+
return nil, serviceerror.NewFailedPreconditionf("activity is in terminal state %v", a.GetStatus())
646+
}
647+
if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
648+
return nil, serviceerror.NewFailedPrecondition("cannot pause an activity with a pending cancellation")
649+
}
650+
if a.PauseState != nil {
651+
return &activitypb.PauseActivityExecutionResponse{}, nil
652+
}
653+
654+
metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityPausedScope)
655+
if err != nil {
656+
return nil, err
657+
}
658+
659+
if TransitionPaused.Possible(a) {
660+
// SCHEDULED → real PAUSED status; stamp bumped to invalidate the pending dispatch task.
661+
if err := TransitionPaused.Apply(a, ctx, pauseEvent{
662+
req: req.GetFrontendRequest(),
663+
metricsHandler: metricsHandler,
664+
}); err != nil {
665+
return nil, err
666+
}
667+
return &activitypb.PauseActivityExecutionResponse{}, nil
668+
}
669+
// STARTED → flag-only pause. Status stays STARTED so the worker's token remains valid.
670+
// The worker will see ActivityPaused=true on the next heartbeat.
671+
a.pause(ctx, pauseEvent{req.GetFrontendRequest(), metricsHandler})
672+
return &activitypb.PauseActivityExecutionResponse{}, nil
673+
}
674+
675+
func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activitypb.UnpauseActivityExecutionRequest) (
676+
*activitypb.UnpauseActivityExecutionResponse, error,
677+
) {
678+
if a.isTerminal() {
679+
return nil, serviceerror.NewFailedPreconditionf("activity is in terminal state %v", a.GetStatus())
680+
}
681+
// Not paused → no-op.
682+
if a.PauseState == nil {
683+
return &activitypb.UnpauseActivityExecutionResponse{}, nil
684+
}
685+
686+
metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityUnpausedScope)
687+
if err != nil {
688+
return nil, err
689+
}
690+
691+
if TransitionUnpaused.Possible(a) {
692+
if err := TransitionUnpaused.Apply(a, ctx, unpauseEvent{
693+
req: req.GetFrontendRequest(),
694+
metricsHandler: metricsHandler,
695+
}); err != nil {
696+
return nil, err
697+
}
698+
return &activitypb.UnpauseActivityExecutionResponse{}, nil
699+
}
700+
701+
// Flag-based pause (status is STARTED, CANCEL_REQUESTED, or SCHEDULED after retry while paused).
702+
if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED ||
703+
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
704+
// Worker continues with its existing token — no stamp bump needed, no dispatch task.
705+
// Cancel takes precedence over pause. Unpause clears the pause flag but does not re-dispatch;
706+
// the activity remains CANCEL_REQUESTED and will be cancelled when the worker responds.
707+
a.PauseState = nil
708+
a.emitOnUnpausedMetrics(metricsHandler)
709+
return &activitypb.UnpauseActivityExecutionResponse{}, nil
710+
}
711+
a.unpause(ctx, unpauseEvent{
712+
req: req.GetFrontendRequest(),
713+
metricsHandler: metricsHandler,
714+
})
715+
return &activitypb.UnpauseActivityExecutionResponse{}, nil
716+
}
717+
718+
func (a *Activity) unpause(
719+
ctx chasm.MutableContext,
720+
event unpauseEvent,
721+
) {
722+
a.PauseState = nil
723+
attempt := a.LastAttempt.Get(ctx)
724+
if event.req.GetResetAttempts() {
725+
attempt.Count = 1
726+
}
727+
if event.req.GetResetHeartbeat() {
728+
a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{})
729+
}
730+
attempt.Stamp++
731+
attempt.CurrentRetryInterval = nil
732+
scheduleTime := ctx.Now(a)
733+
if jitter := event.req.GetJitter().AsDuration(); jitter > 0 {
734+
scheduleTime = scheduleTime.Add(time.Duration(rand.Int63n(int64(jitter)))) //nolint:gosec
735+
}
736+
if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 {
737+
ctx.AddTask(
738+
a,
739+
chasm.TaskAttributes{ScheduledTime: scheduleTime.Add(timeout)},
740+
&activitypb.ScheduleToStartTimeoutTask{Stamp: attempt.GetStamp()})
741+
}
742+
ctx.AddTask(
743+
a,
744+
chasm.TaskAttributes{ScheduledTime: scheduleTime},
745+
&activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()})
746+
a.emitOnUnpausedMetrics(event.metricsHandler)
747+
}
748+
func (a *Activity) pause(
749+
ctx chasm.MutableContext,
750+
event pauseEvent,
751+
) {
752+
a.PauseState = &activitypb.ActivityPauseState{
753+
PauseTime: timestamppb.New(ctx.Now(a)),
754+
Identity: event.req.GetIdentity(),
755+
Reason: event.req.GetReason(),
756+
}
757+
a.emitOnPausedMetrics(event.metricsHandler)
758+
}
759+
624760
// recordScheduleToStartOrCloseTimeoutFailure records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we
625761
// set the outcome failure directly and leave the attempt failure as is.
626762
func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error {
@@ -671,7 +807,9 @@ func (a *Activity) recordFailedAttempt(
671807
}
672808

673809
// tryReschedule attempts to reschedule the activity for retry. Returns true if rescheduled, false
674-
// if retry is not possible.
810+
// if retry is not possible. When the activity has PauseState set (flag-based pause from STARTED),
811+
// the retry transitions to SCHEDULED normally but the dispatch task is blocked by the pause flag
812+
// until the activity is unpaused.
675813
func (a *Activity) tryReschedule(
676814
ctx chasm.MutableContext,
677815
overridingRetryInterval time.Duration,
@@ -767,7 +905,8 @@ func (a *Activity) RecordHeartbeat(
767905
}
768906
return &historyservice.RecordActivityTaskHeartbeatResponse{
769907
CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
770-
// TODO(saa-preview): ActivityPaused, ActivityReset
908+
ActivityPaused: a.PauseState != nil,
909+
// TODO(saa-preview): ActivityReset
771910
}, nil
772911
}
773912

@@ -776,7 +915,8 @@ func InternalStatusToAPIStatus(status activitypb.ActivityExecutionStatus) enumsp
776915
switch status {
777916
case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED,
778917
activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
779-
activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED:
918+
activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
919+
activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED:
780920
return enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING
781921
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED:
782922
return enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED
@@ -803,6 +943,8 @@ func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb
803943
return enumspb.PENDING_ACTIVITY_STATE_STARTED
804944
case activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED:
805945
return enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED
946+
case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED:
947+
return enumspb.PENDING_ACTIVITY_STATE_PAUSED
806948
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED,
807949
activitypb.ACTIVITY_EXECUTION_STATUS_FAILED,
808950
activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED,
@@ -816,9 +958,20 @@ func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb
816958
}
817959

818960
func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) *apiactivitypb.ActivityExecutionInfo {
819-
// TODO(saa-preview): support pause states
820961
status := InternalStatusToAPIStatus(a.GetStatus())
821-
runState := internalStatusToRunState(a.GetStatus())
962+
// Derive the external run state with hybrid pause logic:
963+
// PAUSED status (real) → PAUSED
964+
// STARTED + PauseState != nil (pause requested while running) → PAUSE_REQUESTED
965+
// SCHEDULED + PauseState != nil (retry while paused flag set) → PAUSED
966+
// All other cases → derived from internal status directly
967+
var runState enumspb.PendingActivityState
968+
if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && a.PauseState != nil {
969+
runState = enumspb.PENDING_ACTIVITY_STATE_PAUSE_REQUESTED
970+
} else if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && a.PauseState != nil {
971+
runState = enumspb.PENDING_ACTIVITY_STATE_PAUSED
972+
} else {
973+
runState = internalStatusToRunState(a.GetStatus())
974+
}
822975

823976
requestData := a.RequestData.Get(ctx)
824977
attempt := a.LastAttempt.Get(ctx)
@@ -1104,6 +1257,20 @@ func (a *Activity) emitOnTimedOutMetrics(
11041257
metrics.ActivityTimeout.With(handler).Record(1, timeoutTag)
11051258
}
11061259

1260+
func (a *Activity) emitOnPausedMetrics(
1261+
handler metrics.Handler,
1262+
) {
1263+
metrics.ActivityPauseRequests.With(handler).Record(1)
1264+
metrics.ActivityPause.With(handler).Record(1)
1265+
}
1266+
1267+
func (a *Activity) emitOnUnpausedMetrics(
1268+
handler metrics.Handler,
1269+
) {
1270+
metrics.ActivityUnpauseRequests.With(handler).Record(1)
1271+
metrics.ActivityUnpause.With(handler).Record(1)
1272+
}
1273+
11071274
// SearchAttributes implements chasm.VisibilitySearchAttributesProvider interface.
11081275
// Returns the current search attribute values for this activity execution.
11091276
func (a *Activity) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue {

chasm/lib/activity/activity_tasks.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ func (h *activityDispatchTaskHandler) Validate(
3636
task *activitypb.ActivityDispatchTask,
3737
) (bool, error) {
3838
// TODO(saa-preview): make sure we handle resets when we support them, as they will reset the attempt count
39+
// Do not dispatch while the activity has a pause flag set (SCHEDULED + PauseState from a retry
40+
// while a STARTED activity was flag-paused). TransitionStarted.Possible already returns false for
41+
// real PAUSED status activities (source must be SCHEDULED, and PAUSED → SCHEDULED via unpause).
3942
return (TransitionStarted.Possible(activity) &&
43+
activity.PauseState == nil &&
4044
task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()), nil
4145
}
4246

@@ -93,7 +97,9 @@ func (h *scheduleToStartTimeoutTaskHandler) Validate(
9397
_ chasm.TaskAttributes,
9498
task *activitypb.ScheduleToStartTimeoutTask,
9599
) (bool, error) {
100+
// Do not time out a SCHEDULED activity that has the pause flag set (retry while paused).
96101
return (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED &&
102+
activity.PauseState == nil &&
97103
task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()), nil
98104
}
99105

chasm/lib/activity/frontend.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,15 @@ func (h *frontendHandler) PauseActivityExecution(
468468
return nil, ErrStandaloneActivityDisabled
469469
}
470470

471-
// TODO: validate request fields (e.g. namespace, identity length)
471+
if err := validatePauseActivityExecutionRequest(
472+
req,
473+
h.config.MaxIDLengthLimit(),
474+
h.config.BlobSizeLimitError,
475+
h.config.BlobSizeLimitWarn,
476+
h.logger); err != nil {
477+
return nil, err
478+
}
479+
472480
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
473481
if err != nil {
474482
return nil, err
@@ -492,7 +500,10 @@ func (h *frontendHandler) UnpauseActivityExecution(
492500
return nil, ErrStandaloneActivityDisabled
493501
}
494502

495-
// TODO: validate request fields (e.g. namespace, identity length)
503+
if err := validateUnpauseActivityExecutionRequest(req, h.config.MaxIDLengthLimit()); err != nil {
504+
return nil, err
505+
}
506+
496507
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
497508
if err != nil {
498509
return nil, err

chasm/lib/activity/gen/activitypb/v1/activity_state.go-helpers.pb.go

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)