Skip to content

Commit 80b781d

Browse files
committed
Nexus Standalone: assorted post-merge cleanup
1 parent 8342fc5 commit 80b781d

17 files changed

Lines changed: 655 additions & 314 deletions

chasm/lib/nexusoperation/cancellation.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,13 @@ func (c *Cancellation) loadArgs(
102102
if err != nil {
103103
return cancelArgs{}, err
104104
}
105+
} else {
106+
requestData := op.RequestData.Get(ctx)
107+
invocationData = InvocationData{
108+
Input: requestData.GetInput(),
109+
Header: requestData.GetNexusHeader(),
110+
}
105111
}
106-
// TODO: For standalone operations, load invocation data from the operation state.
107112

108113
return cancelArgs{
109114
service: op.GetService(),
@@ -149,7 +154,6 @@ func (c *Cancellation) saveResult(
149154
}
150155

151156
func CancellationAPIState(status nexusoperationpb.CancellationStatus) enumspb.NexusOperationCancellationState {
152-
// TODO(samm): deduplicate against standalone nexus operations
153157
switch status {
154158
case nexusoperationpb.CANCELLATION_STATUS_SCHEDULED:
155159
return enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED

chasm/lib/nexusoperation/cancellation_tasks_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/nexus-rpc/sdk-go/nexus"
1010
"github.com/stretchr/testify/require"
11+
commonpb "go.temporal.io/api/common/v1"
1112
enumspb "go.temporal.io/api/enums/v1"
1213
failurepb "go.temporal.io/api/failure/v1"
1314
"go.temporal.io/api/serviceerror"
@@ -268,6 +269,39 @@ func TestCancellationBackoffTaskHandler_Execute(t *testing.T) {
268269
require.True(t, ok, "expected CancellationTask")
269270
}
270271

272+
func TestCancellationLoadArgs_StandaloneFallsBackToRequestData(t *testing.T) {
273+
now := time.Now().UTC()
274+
input := &commonpb.Payload{Data: []byte("test-input")}
275+
headers := map[string]string{"test-header": "test-value"}
276+
277+
op := NewOperation(&nexusoperationpb.OperationState{
278+
Service: "test-service",
279+
Operation: "test-operation",
280+
OperationToken: "test-operation-token",
281+
RequestId: "test-request-id",
282+
Endpoint: "test-endpoint",
283+
EndpointId: "test-endpoint-id",
284+
ScheduledTime: timestamppb.New(now.Add(-2 * time.Minute)),
285+
StartedTime: timestamppb.New(now.Add(-1 * time.Minute)),
286+
ScheduleToCloseTimeout: durationpb.New(10 * time.Minute),
287+
StartToCloseTimeout: durationpb.New(5 * time.Minute),
288+
})
289+
op.RequestData = chasm.NewDataField(nil, &nexusoperationpb.OperationRequestData{
290+
Input: input,
291+
NexusHeader: headers,
292+
})
293+
294+
cancellation := newCancellation(&nexusoperationpb.CancellationState{})
295+
cancellation.Operation = chasm.NewMockParentPtr(op)
296+
297+
args, err := cancellation.loadArgs(&chasm.MockContext{
298+
HandleNow: func(chasm.Component) time.Time { return now },
299+
}, nil)
300+
require.NoError(t, err)
301+
protorequire.ProtoEqual(t, input, args.payload)
302+
require.Equal(t, headers, args.headers)
303+
}
304+
271305
func TestCancellationInvocationTaskHandler_HTTP(t *testing.T) {
272306
cases := []struct {
273307
name string

chasm/lib/nexusoperation/frontend.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,6 @@ func NewFrontendHandler(
6363
}
6464
}
6565

66-
func (*frontendHandler) CountNexusOperationExecutions(context.Context, *workflowservice.CountNexusOperationExecutionsRequest) (*workflowservice.CountNexusOperationExecutionsResponse, error) {
67-
return nil, ErrStandaloneNexusOperationDisabled
68-
}
69-
7066
func (h *frontendHandler) StartNexusOperationExecution(
7167
ctx context.Context,
7268
req *workflowservice.StartNexusOperationExecutionRequest,
@@ -322,3 +318,8 @@ func (h *frontendHandler) DeleteNexusOperationExecution(
322318

323319
return &workflowservice.DeleteNexusOperationExecutionResponse{}, nil
324320
}
321+
322+
// isStandaloneNexusOperationEnabled checks if standalone Nexus operations are enabled for the given namespace.
323+
func (h *frontendHandler) isStandaloneNexusOperationEnabled(namespaceName string) bool {
324+
return h.config.EnableChasm(namespaceName) && h.config.Enabled(namespaceName)
325+
}

chasm/lib/nexusoperation/handler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ func (h *handler) StartNexusOperation(
5252
)
5353
if err != nil {
5454
if alreadyStartedErr, ok := errors.AsType[*chasm.ExecutionAlreadyStartedError](err); ok {
55-
return nil, serviceerror.NewAlreadyExistsf(
55+
return nil, serviceerror.NewNexusOperationExecutionAlreadyStartedf(
56+
alreadyStartedErr.CurrentRequestID,
57+
alreadyStartedErr.CurrentRunID,
5658
"nexus operation execution already started: request_id=%s, run_id=%s",
5759
alreadyStartedErr.CurrentRequestID,
5860
alreadyStartedErr.CurrentRunID,

chasm/lib/nexusoperation/library.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"google.golang.org/grpc"
77
)
88

9-
const TaskGroupName = "nexusoperation"
10-
119
// componentOnlyLibrary registers just the components without task executors or gRPC handlers.
1210
// Used in the frontend to enable component ref serialization.
1311
type componentOnlyLibrary struct {

chasm/lib/nexusoperation/operation.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,27 @@ var _ chasm.StateMachine[nexusoperationpb.OperationStatus] = (*Operation)(nil)
3838
var _ chasm.VisibilitySearchAttributesProvider = (*Operation)(nil)
3939
var _ chasm.NexusCompletionHandler = (*Operation)(nil)
4040

41+
// ErrCancellationAlreadyRequested is returned when a cancellation has already been requested for an operation.
4142
var ErrCancellationAlreadyRequested = serviceerror.NewFailedPrecondition("cancellation already requested")
43+
44+
// ErrOperationAlreadyCompleted is returned when trying to cancel an operation that has already completed.
4245
var ErrOperationAlreadyCompleted = serviceerror.NewFailedPrecondition("operation already completed")
4346

47+
// InvocationData contains data needed to invoke a Nexus operation.
4448
type InvocationData struct {
45-
Input *commonpb.Payload
46-
Header map[string]string
49+
// Input is the operation input payload.
50+
Input *commonpb.Payload
51+
// Header contains the Nexus headers for the operation.
52+
Header map[string]string
53+
// NexusLinks are the links to the caller(s) that scheduled this operation.
4754
NexusLinks []nexus.Link
4855
}
4956

5057
// TaskGroupName groups invocation and cancellation together for the outbound queue
5158
const TaskGroupName = "nexus"
5259

60+
// OperationStore defines the interface that must be implemented by any parent component that wants to manage Nexus operations.
61+
// It's the responsibility of the parent component to apply the appropriate state transitions to the operation.
5362
type OperationStore interface {
5463
OnNexusOperationStarted(ctx chasm.MutableContext, operation *Operation, operationToken string, startTime *time.Time, links []*commonpb.Link) error
5564
OnNexusOperationCanceled(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error
@@ -58,21 +67,30 @@ type OperationStore interface {
5867
OnNexusOperationCompleted(ctx chasm.MutableContext, operation *Operation, result *commonpb.Payload, links []*commonpb.Link) error
5968
OnNexusOperationCancellationCompleted(ctx chasm.MutableContext, operation *Operation) error
6069
OnNexusOperationCancellationFailed(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error
70+
// NexusOperationInvocationData loads invocation data (Input, Header, NexusLinks) from the scheduled history event.
6171
NexusOperationInvocationData(ctx chasm.Context, operation *Operation) (InvocationData, error)
6272
}
6373

74+
// Operation is a CHASM component that represents a Nexus operation.
6475
type Operation struct {
6576
chasm.UnimplementedComponent
77+
78+
// Persisted internal state
6679
*nexusoperationpb.OperationState
6780

81+
// Pointer to an implementation of the "store". For a workflow-based Nexus operation
82+
// this is a parent pointer back to the workflow. For a standalone Nexus operation this is nil.
6883
Store chasm.ParentPtr[OperationStore]
6984

70-
RequestData chasm.Field[*nexusoperationpb.OperationRequestData]
85+
RequestData chasm.Field[*nexusoperationpb.OperationRequestData]
86+
// Cancellation is a child component that manages sending the cancel request to the Nexus endpoint.
87+
// Created when cancellation is requested, nil otherwise.
7188
Cancellation chasm.Field[*Cancellation]
7289
Outcome chasm.Field[*nexusoperationpb.OperationOutcome]
7390
Visibility chasm.Field[*chasm.Visibility]
7491
}
7592

93+
// NewOperation creates a new Operation component with the given persisted state.
7694
func NewOperation(state *nexusoperationpb.OperationState) *Operation {
7795
return &Operation{OperationState: state}
7896
}
@@ -88,6 +106,8 @@ func newStandaloneOperation(
88106
Service: frontendReq.GetService(),
89107
Operation: frontendReq.GetOperation(),
90108
ScheduleToCloseTimeout: frontendReq.GetScheduleToCloseTimeout(),
109+
ScheduleToStartTimeout: frontendReq.GetScheduleToStartTimeout(),
110+
StartToCloseTimeout: frontendReq.GetStartToCloseTimeout(),
91111
ScheduledTime: timestamppb.New(ctx.Now(nil)),
92112
RequestId: uuid.NewString(),
93113
})
@@ -109,6 +129,7 @@ func newStandaloneOperation(
109129
return op, nil
110130
}
111131

132+
// LifecycleState maps the operation's status to a CHASM lifecycle state.
112133
func (o *Operation) LifecycleState(_ chasm.Context) chasm.LifecycleState {
113134
switch o.Status {
114135
case nexusoperationpb.OPERATION_STATUS_SUCCEEDED:
@@ -127,14 +148,18 @@ func (o *Operation) ContextMetadata(_ chasm.Context) map[string]string {
127148
return nil
128149
}
129150

151+
// StateMachineState returns the current operation status.
130152
func (o *Operation) StateMachineState() nexusoperationpb.OperationStatus {
131153
return o.Status
132154
}
133155

156+
// SetStateMachineState sets the operation status.
134157
func (o *Operation) SetStateMachineState(status nexusoperationpb.OperationStatus) {
135158
o.Status = status
136159
}
137160

161+
// RequestCancel requests cancellation of the operation. It creates a Cancellation child component and, if the
162+
// operation has already started, schedules the cancellation request to be sent to the Nexus endpoint.
138163
func (o *Operation) RequestCancel(
139164
ctx chasm.MutableContext,
140165
req *nexusoperationpb.CancellationState,
@@ -154,6 +179,8 @@ func (o *Operation) RequestCancel(
154179

155180
cancel := newCancellation(req)
156181
o.Cancellation = chasm.NewComponentField(ctx, cancel)
182+
// Once started, the handler returns a token that can be used in the cancellation request.
183+
// Until then, no need to schedule the cancellation.
157184
if o.Status == nexusoperationpb.OPERATION_STATUS_STARTED {
158185
return TransitionCancellationScheduled.Apply(cancel, ctx, EventCancellationScheduled{
159186
Destination: o.GetEndpoint(),
@@ -162,6 +189,7 @@ func (o *Operation) RequestCancel(
162189
return nil
163190
}
164191

192+
// onStarted applies the started transition or delegates to the store if one is present.
165193
func (o *Operation) onStarted(ctx chasm.MutableContext, operationToken string, startTime *time.Time, links []*commonpb.Link) error {
166194
if store, ok := o.Store.TryGet(ctx); ok {
167195
return store.OnNexusOperationStarted(ctx, o, operationToken, startTime, links)
@@ -173,6 +201,7 @@ func (o *Operation) onStarted(ctx chasm.MutableContext, operationToken string, s
173201
})
174202
}
175203

204+
// onCompleted applies the succeeded transition or delegates to the store if one is present.
176205
func (o *Operation) onCompleted(ctx chasm.MutableContext, result *commonpb.Payload, links []*commonpb.Link) error {
177206
if store, ok := o.Store.TryGet(ctx); ok {
178207
return store.OnNexusOperationCompleted(ctx, o, result, links)
@@ -181,20 +210,23 @@ func (o *Operation) onCompleted(ctx chasm.MutableContext, result *commonpb.Paylo
181210
return TransitionSucceeded.Apply(o, ctx, EventSucceeded{Result: result})
182211
}
183212

213+
// onFailed applies the failed transition or delegates to the store if one is present.
184214
func (o *Operation) onFailed(ctx chasm.MutableContext, cause *failurepb.Failure) error {
185215
if store, ok := o.Store.TryGet(ctx); ok {
186216
return store.OnNexusOperationFailed(ctx, o, cause)
187217
}
188218
return TransitionFailed.Apply(o, ctx, EventFailed{Failure: cause})
189219
}
190220

221+
// onCanceled applies the canceled transition or delegates to the store if one is present.
191222
func (o *Operation) onCanceled(ctx chasm.MutableContext, cause *failurepb.Failure) error {
192223
if store, ok := o.Store.TryGet(ctx); ok {
193224
return store.OnNexusOperationCanceled(ctx, o, cause)
194225
}
195226
return TransitionCanceled.Apply(o, ctx, EventCanceled{Failure: cause})
196227
}
197228

229+
// onTimedOut applies the timed out transition or delegates to the store if one is present.
198230
func (o *Operation) onTimedOut(ctx chasm.MutableContext, cause *failurepb.Failure, fromAttempt bool) error {
199231
if store, ok := o.Store.TryGet(ctx); ok {
200232
return store.OnNexusOperationTimedOut(ctx, o, cause, fromAttempt)
@@ -205,27 +237,31 @@ func (o *Operation) onTimedOut(ctx chasm.MutableContext, cause *failurepb.Failur
205237
})
206238
}
207239

240+
// HandleNexusCompletion handles the outcome of an asynchronous completion callback.
208241
func (o *Operation) HandleNexusCompletion(
209242
ctx chasm.MutableContext,
210243
completion *persistencespb.ChasmNexusCompletion,
211244
) error {
245+
// Request ID lets us reject a stale or misrouted completion.
212246
if completion.GetRequestId() != "" && o.GetRequestId() != completion.GetRequestId() {
213247
return serviceerror.NewNotFound("operation not found")
214248
}
215249

216250
links := completion.GetLinks()
217251

252+
// For completion-before-start, apply the started transition first.
218253
if o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED {
219254
startTime := timestamp.TimeValuePtr(completion.GetStartTime())
220255
if err := o.onStarted(ctx, completion.GetOperationToken(), startTime, links); err != nil {
221256
return err
222257
}
258+
// Links belong only to the synthetic started event.
223259
links = nil
224260
}
225261

226262
switch outcome := completion.Outcome.(type) {
227263
case *persistencespb.ChasmNexusCompletion_Success:
228-
return o.onCompleted(ctx, outcome.Success, links)
264+
return o.onCompleted(ctx, outcome.Success, nil)
229265
case *persistencespb.ChasmNexusCompletion_Failure:
230266
if outcome.Failure.GetCanceledFailureInfo() != nil {
231267
return o.onCanceled(ctx, outcome.Failure)
@@ -236,6 +272,7 @@ func (o *Operation) HandleNexusCompletion(
236272
}
237273
}
238274

275+
// loadStartArgs is a ReadComponent callback that loads the start arguments from the operation.
239276
func (o *Operation) loadStartArgs(
240277
ctx chasm.Context,
241278
_ chasm.NoValue,
@@ -286,11 +323,13 @@ func (o *Operation) loadStartArgs(
286323
}, nil
287324
}
288325

326+
// saveInvocationResultInput is the input to the Operation.saveInvocationResult method used in UpdateComponent.
289327
type saveInvocationResultInput struct {
290328
result invocationResult
291329
retryPolicy backoff.RetryPolicy
292330
}
293331

332+
// saveInvocationResult handles the outcome of the initial start call.
294333
func (o *Operation) saveInvocationResult(
295334
ctx chasm.MutableContext,
296335
input saveInvocationResultInput,
@@ -299,6 +338,8 @@ func (o *Operation) saveInvocationResult(
299338
case invocationResultOK:
300339
links := convertResponseLinks(r.response.Links, ctx.Logger())
301340
if r.response.Pending != nil {
341+
// An async operation transitions to STARTED here;
342+
// HandleNexusCompletion will apply its outcome from the completion callback.
302343
return nil, o.onStarted(ctx, r.response.Pending.Token, nil, links)
303344
}
304345
return nil, o.onCompleted(ctx, r.response.Successful, links)
@@ -388,17 +429,11 @@ func (o *Operation) buildDescribeResponse(
388429
resp.Input = o.RequestData.Get(ctx).GetInput()
389430
}
390431
if req.GetFrontendRequest().GetIncludeOutcome() && o.isClosed() {
391-
if successful, failure := o.describeOutcome(ctx); successful != nil {
432+
if successful, failure := o.outcome(ctx); successful != nil {
392433
resp.Outcome = &workflowservice.DescribeNexusOperationExecutionResponse_Result{
393434
Result: successful,
394435
}
395436
} else if failure != nil {
396-
if failure.GetTimeoutFailureInfo() != nil {
397-
failure = &failurepb.Failure{
398-
Message: "nexus operation timed out",
399-
Cause: failure,
400-
}
401-
}
402437
resp.Outcome = &workflowservice.DescribeNexusOperationExecutionResponse_Failure{
403438
Failure: failure,
404439
}
@@ -417,17 +452,11 @@ func (o *Operation) buildPollResponse(
417452

418453
if o.isClosed() {
419454
resp.WaitStage = enumspb.NEXUS_OPERATION_WAIT_STAGE_CLOSED
420-
if successful, failure := o.describeOutcome(ctx); successful != nil {
455+
if successful, failure := o.outcome(ctx); successful != nil {
421456
resp.Outcome = &workflowservice.PollNexusOperationExecutionResponse_Result{
422457
Result: successful,
423458
}
424459
} else if failure != nil {
425-
if failure.GetTimeoutFailureInfo() != nil {
426-
failure = &failurepb.Failure{
427-
Message: "nexus operation timed out",
428-
Cause: failure,
429-
}
430-
}
431460
resp.Outcome = &workflowservice.PollNexusOperationExecutionResponse_Failure{
432461
Failure: failure,
433462
}
@@ -441,7 +470,7 @@ func (o *Operation) buildPollResponse(
441470
}
442471
}
443472

444-
func (o *Operation) describeOutcome(ctx chasm.Context) (*commonpb.Payload, *failurepb.Failure) {
473+
func (o *Operation) outcome(ctx chasm.Context) (*commonpb.Payload, *failurepb.Failure) {
445474
if !o.isClosed() {
446475
return nil, nil
447476
}

0 commit comments

Comments
 (0)