Skip to content

Commit f9b2f6d

Browse files
authored
Nexus workflow operation cancellation for CHASM (#9915)
## What changed? Full implementation of the cancellation path for workflow Nexus operations on CHASM. Including: - State machine logic - Component logic - Workflow event handlers - Integration into workflows - Executors I also ended up doing quite a bit of refactoring to reuse as much of the code as possible across the cancellation and invocation executors. Also added missing test coverage for the invocation executors. I tried to structure this into separate commits that make reviews easy but ended up with a lot of fixes in later commits that touched code in earlier commits. Best to review in one piece.
1 parent f5246b3 commit f9b2f6d

19 files changed

Lines changed: 2490 additions & 732 deletions

chasm/lib/nexusoperation/cancellation.go

Lines changed: 111 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package nexusoperation
22

33
import (
4+
"time"
5+
6+
commonpb "go.temporal.io/api/common/v1"
7+
failurepb "go.temporal.io/api/failure/v1"
8+
"go.temporal.io/api/serviceerror"
49
"go.temporal.io/server/chasm"
5-
"go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
10+
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
11+
"go.temporal.io/server/common/backoff"
612
)
713

814
var _ chasm.Component = (*Cancellation)(nil)
@@ -14,15 +20,18 @@ type Cancellation struct {
1420

1521
// Persisted internal state
1622
*nexusoperationpb.CancellationState
23+
24+
// Operation is a pointer to the parent Operation component.
25+
Operation chasm.ParentPtr[*Operation]
1726
}
1827

1928
func newCancellation(state *nexusoperationpb.CancellationState) *Cancellation {
2029
return &Cancellation{CancellationState: state}
2130
}
2231

2332
// LifecycleState maps the cancellation's status to a CHASM lifecycle state.
24-
func (o *Cancellation) LifecycleState(_ chasm.Context) chasm.LifecycleState {
25-
switch o.Status {
33+
func (c *Cancellation) LifecycleState(_ chasm.Context) chasm.LifecycleState {
34+
switch c.Status {
2635
case nexusoperationpb.CANCELLATION_STATUS_SUCCEEDED:
2736
return chasm.LifecycleStateCompleted
2837
case nexusoperationpb.CANCELLATION_STATUS_FAILED,
@@ -34,11 +43,106 @@ func (o *Cancellation) LifecycleState(_ chasm.Context) chasm.LifecycleState {
3443
}
3544

3645
// StateMachineState returns the current cancellation status.
37-
func (o *Cancellation) StateMachineState() nexusoperationpb.CancellationStatus {
38-
return o.Status
46+
func (c *Cancellation) StateMachineState() nexusoperationpb.CancellationStatus {
47+
return c.Status
3948
}
4049

4150
// SetStateMachineState sets the cancellation status.
42-
func (o *Cancellation) SetStateMachineState(status nexusoperationpb.CancellationStatus) {
43-
o.Status = status
51+
func (c *Cancellation) SetStateMachineState(status nexusoperationpb.CancellationStatus) {
52+
c.Status = status
53+
}
54+
55+
// cancelArgs holds the arguments needed to cancel a Nexus operation.
56+
type cancelArgs struct {
57+
service string
58+
operation string
59+
token string
60+
requestID string
61+
endpointName string
62+
endpointID string
63+
currentTime time.Time
64+
scheduledTime time.Time
65+
startedTime time.Time
66+
scheduleToCloseTimeout time.Duration
67+
startToCloseTimeout time.Duration
68+
headers map[string]string
69+
payload *commonpb.Payload
70+
}
71+
72+
func (c *Cancellation) onCompleted(ctx chasm.MutableContext) error {
73+
op := c.Operation.Get(ctx)
74+
if store, ok := op.Store.TryGet(ctx); ok {
75+
return store.OnNexusOperationCancellationCompleted(ctx, op)
76+
}
77+
return TransitionCancellationSucceeded.Apply(c, ctx, EventCancellationSucceeded{})
78+
}
79+
80+
func (c *Cancellation) onFailed(ctx chasm.MutableContext, failure *failurepb.Failure) error {
81+
op := c.Operation.Get(ctx)
82+
if store, ok := op.Store.TryGet(ctx); ok {
83+
return store.OnNexusOperationCancellationFailed(ctx, op, failure)
84+
}
85+
return TransitionCancellationFailed.Apply(c, ctx, EventCancellationFailed{
86+
Failure: failure,
87+
})
88+
}
89+
90+
// loadArgs loads the cancel arguments from the cancellation and its parent operation.
91+
func (c *Cancellation) loadArgs(
92+
ctx chasm.Context,
93+
_ chasm.NoValue,
94+
) (cancelArgs, error) {
95+
op := c.Operation.Get(ctx)
96+
97+
var invocationData InvocationData
98+
if store, ok := op.Store.TryGet(ctx); ok {
99+
var err error
100+
invocationData, err = store.NexusOperationInvocationData(ctx, op)
101+
if err != nil {
102+
return cancelArgs{}, err
103+
}
104+
}
105+
// TODO: For standalone operations, load invocation data from the operation state.
106+
107+
return cancelArgs{
108+
service: op.GetService(),
109+
operation: op.GetOperation(),
110+
token: op.GetOperationToken(),
111+
requestID: op.GetRequestId(),
112+
endpointName: op.GetEndpoint(),
113+
endpointID: op.GetEndpointId(),
114+
currentTime: ctx.Now(c),
115+
scheduledTime: op.GetScheduledTime().AsTime(),
116+
startedTime: op.GetStartedTime().AsTime(),
117+
scheduleToCloseTimeout: op.GetScheduleToCloseTimeout().AsDuration(),
118+
startToCloseTimeout: op.GetStartToCloseTimeout().AsDuration(),
119+
headers: invocationData.Header,
120+
payload: invocationData.Input,
121+
}, nil
122+
}
123+
124+
// saveCancellationResultInput is the input to the Cancellation.saveResult method.
125+
type saveCancellationResultInput struct {
126+
result cancellationResult
127+
retryPolicy func() backoff.RetryPolicy
128+
}
129+
130+
// saveResult applies the outcome of a cancel operation call to the cancellation state machine.
131+
func (c *Cancellation) saveResult(
132+
ctx chasm.MutableContext,
133+
input saveCancellationResultInput,
134+
) (chasm.NoValue, error) {
135+
switch r := input.result.(type) {
136+
case cancellationResultOK:
137+
return nil, c.onCompleted(ctx)
138+
case cancellationResultFail:
139+
return nil, c.onFailed(ctx, r.failure)
140+
case cancellationResultRetry:
141+
return nil, transitionCancellationAttemptFailed.Apply(c, ctx, EventCancellationAttemptFailed{
142+
Failure: r.failure,
143+
RetryPolicy: input.retryPolicy(),
144+
})
145+
default:
146+
return nil, serviceerror.NewInternalf("cannot save cancellation result of type %T", r)
147+
}
44148
}

chasm/lib/nexusoperation/cancellation_statemachine.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,20 @@ import (
1111
// EventCancellationScheduled is triggered when cancellation is meant to be scheduled for the first time - immediately
1212
// after it has been requested.
1313
type EventCancellationScheduled struct {
14+
// Destination is the endpoint name for the cancellation task.
15+
// Must be provided by the caller because ParentPtr is not available during inline creation.
16+
Destination string
1417
}
1518

1619
var TransitionCancellationScheduled = chasm.NewTransition(
1720
[]nexusoperationpb.CancellationStatus{nexusoperationpb.CANCELLATION_STATUS_UNSPECIFIED},
1821
nexusoperationpb.CANCELLATION_STATUS_SCHEDULED,
1922
func(c *Cancellation, ctx chasm.MutableContext, event EventCancellationScheduled) error {
20-
ctx.AddTask(c, chasm.TaskAttributes{}, &nexusoperationpb.CancellationTask{
23+
c.Attempt++
24+
25+
ctx.AddTask(c, chasm.TaskAttributes{
26+
Destination: event.Destination,
27+
}, &nexusoperationpb.CancellationTask{
2128
Attempt: c.Attempt,
2229
})
2330

@@ -34,9 +41,12 @@ var transitionCancellationRescheduled = chasm.NewTransition(
3441
[]nexusoperationpb.CancellationStatus{nexusoperationpb.CANCELLATION_STATUS_BACKING_OFF},
3542
nexusoperationpb.CANCELLATION_STATUS_SCHEDULED,
3643
func(c *Cancellation, ctx chasm.MutableContext, event EventCancellationRescheduled) error {
44+
c.Attempt++
3745
c.NextAttemptScheduleTime = nil
3846

39-
ctx.AddTask(c, chasm.TaskAttributes{}, &nexusoperationpb.CancellationTask{
47+
ctx.AddTask(c, chasm.TaskAttributes{
48+
Destination: c.Operation.Get(ctx).GetEndpoint(),
49+
}, &nexusoperationpb.CancellationTask{
4050
Attempt: c.Attempt,
4151
})
4252

@@ -56,7 +66,6 @@ var transitionCancellationAttemptFailed = chasm.NewTransition(
5666
func(c *Cancellation, ctx chasm.MutableContext, event EventCancellationAttemptFailed) error {
5767
currentTime := ctx.Now(c)
5868

59-
c.Attempt++
6069
c.LastAttemptCompleteTime = timestamppb.New(currentTime)
6170
c.LastAttemptFailure = event.Failure
6271

@@ -76,17 +85,22 @@ var transitionCancellationAttemptFailed = chasm.NewTransition(
7685

7786
// EventCancellationFailed is triggered when a cancellation attempt is failed with a non retryable error.
7887
type EventCancellationFailed struct {
88+
Failure *failurepb.Failure
7989
}
8090

8191
var TransitionCancellationFailed = chasm.NewTransition(
8292
[]nexusoperationpb.CancellationStatus{
8393
// We can immediately transition to failed since we don't know how to send a cancellation request for an
8494
// unstarted operation.
95+
// TODO: This doesn't seem to happen in either the HSM or CHASM implementations.
8596
nexusoperationpb.CANCELLATION_STATUS_UNSPECIFIED,
8697
nexusoperationpb.CANCELLATION_STATUS_SCHEDULED,
8798
},
8899
nexusoperationpb.CANCELLATION_STATUS_FAILED,
89100
func(c *Cancellation, ctx chasm.MutableContext, event EventCancellationFailed) error {
101+
currentTime := ctx.Now(c)
102+
c.LastAttemptCompleteTime = timestamppb.New(currentTime)
103+
c.LastAttemptFailure = event.Failure
90104
// Terminal state - no tasks to emit.
91105
return nil
92106
},
@@ -100,6 +114,10 @@ var TransitionCancellationSucceeded = chasm.NewTransition(
100114
[]nexusoperationpb.CancellationStatus{nexusoperationpb.CANCELLATION_STATUS_SCHEDULED},
101115
nexusoperationpb.CANCELLATION_STATUS_SUCCEEDED,
102116
func(c *Cancellation, ctx chasm.MutableContext, event EventCancellationSucceeded) error {
117+
currentTime := ctx.Now(c)
118+
c.LastAttemptCompleteTime = timestamppb.New(currentTime)
119+
c.LastAttemptFailure = nil
120+
103121
// Terminal state - no tasks to emit.
104122
return nil
105123
},

0 commit comments

Comments
 (0)