Skip to content

Commit 8342fc5

Browse files
stephanosclaudebergundy
committed
Nexus Standalone: ScheduleToCloseTimeout + StartToCloseTimeout (#10014)
## What changed? Added support for ScheduleToCloseTimeout and StartToCloseTimeout to Standalone Nexus. ## Why? Added [missing timeouts to API](temporalio/api@1d12d58). ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [x] added new functional test(s) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Roey Berman <roey@temporal.io>
1 parent 3ebe60d commit 8342fc5

10 files changed

Lines changed: 335 additions & 222 deletions

File tree

chasm/lib/nexusoperation/operation.go

Lines changed: 60 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type OperationStore interface {
5454
OnNexusOperationStarted(ctx chasm.MutableContext, operation *Operation, operationToken string, startTime *time.Time, links []*commonpb.Link) error
5555
OnNexusOperationCanceled(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error
5656
OnNexusOperationFailed(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error
57-
OnNexusOperationTimedOut(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error
57+
OnNexusOperationTimedOut(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure, fromAttempt bool) error
5858
OnNexusOperationCompleted(ctx chasm.MutableContext, operation *Operation, result *commonpb.Payload, links []*commonpb.Link) error
5959
OnNexusOperationCancellationCompleted(ctx chasm.MutableContext, operation *Operation) error
6060
OnNexusOperationCancellationFailed(ctx chasm.MutableContext, operation *Operation, cause *failurepb.Failure) error
@@ -162,10 +162,8 @@ func (o *Operation) RequestCancel(
162162
return nil
163163
}
164164

165-
// onStarted applies the started transition or delegates to the store if one is present.
166165
func (o *Operation) onStarted(ctx chasm.MutableContext, operationToken string, startTime *time.Time, links []*commonpb.Link) error {
167-
store, ok := o.Store.TryGet(ctx)
168-
if ok {
166+
if store, ok := o.Store.TryGet(ctx); ok {
169167
return store.OnNexusOperationStarted(ctx, o, operationToken, startTime, links)
170168
}
171169
o.Links = append(o.Links, links...)
@@ -197,16 +195,45 @@ func (o *Operation) onCanceled(ctx chasm.MutableContext, cause *failurepb.Failur
197195
return TransitionCanceled.Apply(o, ctx, EventCanceled{Failure: cause})
198196
}
199197

200-
func (o *Operation) onTimedOut(ctx chasm.MutableContext, cause *failurepb.Failure) error {
198+
func (o *Operation) onTimedOut(ctx chasm.MutableContext, cause *failurepb.Failure, fromAttempt bool) error {
201199
if store, ok := o.Store.TryGet(ctx); ok {
202-
return store.OnNexusOperationTimedOut(ctx, o, cause)
200+
return store.OnNexusOperationTimedOut(ctx, o, cause, fromAttempt)
203201
}
204-
if cause != nil {
205-
o.getOrCreateOutcome(ctx).Variant = &nexusoperationpb.OperationOutcome_Failed_{
206-
Failed: &nexusoperationpb.OperationOutcome_Failed{Failure: cause},
202+
return TransitionTimedOut.Apply(o, ctx, EventTimedOut{
203+
Failure: cause,
204+
FromAttempt: fromAttempt,
205+
})
206+
}
207+
208+
func (o *Operation) HandleNexusCompletion(
209+
ctx chasm.MutableContext,
210+
completion *persistencespb.ChasmNexusCompletion,
211+
) error {
212+
if completion.GetRequestId() != "" && o.GetRequestId() != completion.GetRequestId() {
213+
return serviceerror.NewNotFound("operation not found")
214+
}
215+
216+
links := completion.GetLinks()
217+
218+
if o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED {
219+
startTime := timestamp.TimeValuePtr(completion.GetStartTime())
220+
if err := o.onStarted(ctx, completion.GetOperationToken(), startTime, links); err != nil {
221+
return err
207222
}
223+
links = nil
224+
}
225+
226+
switch outcome := completion.Outcome.(type) {
227+
case *persistencespb.ChasmNexusCompletion_Success:
228+
return o.onCompleted(ctx, outcome.Success, links)
229+
case *persistencespb.ChasmNexusCompletion_Failure:
230+
if outcome.Failure.GetCanceledFailureInfo() != nil {
231+
return o.onCanceled(ctx, outcome.Failure)
232+
}
233+
return o.onFailed(ctx, outcome.Failure)
234+
default:
235+
return serviceerror.NewInvalidArgument("invalid completion outcome")
208236
}
209-
return TransitionTimedOut.Apply(o, ctx, EventTimedOut{})
210237
}
211238

212239
func (o *Operation) loadStartArgs(
@@ -272,8 +299,6 @@ func (o *Operation) saveInvocationResult(
272299
case invocationResultOK:
273300
links := convertResponseLinks(r.response.Links, ctx.Logger())
274301
if r.response.Pending != nil {
275-
// An async operation transitions to STARTED here;
276-
// HandleNexusCompletion will apply its outcome from the completion callback.
277302
return nil, o.onStarted(ctx, r.response.Pending.Token, nil, links)
278303
}
279304
return nil, o.onCompleted(ctx, r.response.Successful, links)
@@ -282,7 +307,7 @@ func (o *Operation) saveInvocationResult(
282307
case invocationResultFail:
283308
return nil, o.onFailed(ctx, r.failure)
284309
case invocationResultTimeout:
285-
return nil, o.onTimedOut(ctx, r.failure)
310+
return nil, o.onTimedOut(ctx, r.failure, true)
286311
case invocationResultRetry:
287312
return nil, transitionAttemptFailed.Apply(o, ctx, EventAttemptFailed{
288313
Failure: r.failure,
@@ -293,54 +318,22 @@ func (o *Operation) saveInvocationResult(
293318
}
294319
}
295320

296-
// HandleNexusCompletion handles the outcome of an asynchronous completion callback.
297-
func (o *Operation) HandleNexusCompletion(
298-
ctx chasm.MutableContext,
299-
completion *persistencespb.ChasmNexusCompletion,
300-
) error {
301-
// Request ID lets us reject a stale or misrouted completion.
302-
if completion.GetRequestId() != "" && o.GetRequestId() != completion.GetRequestId() {
303-
return serviceerror.NewNotFound("operation not found")
304-
}
305-
306-
links := completion.GetLinks()
307-
308-
// For completion-before-start, apply the started transition first.
309-
if o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED {
310-
startTime := timestamp.TimeValuePtr(completion.GetStartTime())
311-
if err := o.onStarted(ctx, completion.GetOperationToken(), startTime, links); err != nil {
312-
return err
313-
}
314-
315-
// Links belong only to the synthetic started event.
316-
links = nil
317-
}
318-
319-
switch outcome := completion.Outcome.(type) {
320-
case *persistencespb.ChasmNexusCompletion_Success:
321-
return o.onCompleted(ctx, outcome.Success, links)
322-
case *persistencespb.ChasmNexusCompletion_Failure:
323-
if outcome.Failure.GetCanceledFailureInfo() != nil {
324-
return o.onCanceled(ctx, outcome.Failure)
325-
}
326-
return o.onFailed(ctx, outcome.Failure)
327-
default:
328-
return serviceerror.NewInvalidArgument("invalid completion outcome")
329-
}
330-
}
331-
332-
func (o *Operation) resolveUnsuccessfully(ctx chasm.MutableContext, failure *failurepb.Failure, closeTime time.Time) error {
333-
if o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED {
321+
// resolveUnsuccessfully finalizes the operation. When fromAttempt is true, the failure is recorded as
322+
// LastAttemptFailure. Otherwise the failure is recorded as the terminal Outcome.
323+
func (o *Operation) resolveUnsuccessfully(ctx chasm.MutableContext, failure *failurepb.Failure, closeTime time.Time, fromAttempt bool) error {
324+
softassert.That(ctx.Logger(), failure != nil, "resolveUnsuccessfully called with nil failure")
325+
if fromAttempt {
334326
o.LastAttemptCompleteTime = timestamppb.New(ctx.Now(o))
335327
o.LastAttemptFailure = failure
336-
}
337-
o.ClosedTime = timestamppb.New(closeTime)
338-
o.NextAttemptScheduleTime = nil
339-
if failure != nil {
328+
} else {
340329
o.getOrCreateOutcome(ctx).Variant = &nexusoperationpb.OperationOutcome_Failed_{
341330
Failed: &nexusoperationpb.OperationOutcome_Failed{Failure: failure},
342331
}
343332
}
333+
o.ClosedTime = timestamppb.New(closeTime)
334+
335+
// NextAttemptScheduleTime is only valid in BACKING_OFF; clear on close
336+
o.NextAttemptScheduleTime = nil
344337
return nil
345338
}
346339

@@ -400,6 +393,12 @@ func (o *Operation) buildDescribeResponse(
400393
Result: successful,
401394
}
402395
} else if failure != nil {
396+
if failure.GetTimeoutFailureInfo() != nil {
397+
failure = &failurepb.Failure{
398+
Message: "nexus operation timed out",
399+
Cause: failure,
400+
}
401+
}
403402
resp.Outcome = &workflowservice.DescribeNexusOperationExecutionResponse_Failure{
404403
Failure: failure,
405404
}
@@ -423,6 +422,12 @@ func (o *Operation) buildPollResponse(
423422
Result: successful,
424423
}
425424
} else if failure != nil {
425+
if failure.GetTimeoutFailureInfo() != nil {
426+
failure = &failurepb.Failure{
427+
Message: "nexus operation timed out",
428+
Cause: failure,
429+
}
430+
}
426431
resp.Outcome = &workflowservice.PollNexusOperationExecutionResponse_Failure{
427432
Failure: failure,
428433
}

chasm/lib/nexusoperation/operation_statemachine.go

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,9 @@ var TransitionFailed = chasm.NewTransition(
210210
if event.CompleteTime != nil {
211211
closeTime = *event.CompleteTime
212212
}
213-
return o.resolveUnsuccessfully(ctx, event.Failure, closeTime)
213+
// Attempts only execute in SCHEDULED, so that status identifies attempt-originated failures.
214+
fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED
215+
return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt)
214216
},
215217
)
216218

@@ -234,7 +236,9 @@ var TransitionCanceled = chasm.NewTransition(
234236
if event.CompleteTime != nil {
235237
closeTime = *event.CompleteTime
236238
}
237-
return o.resolveUnsuccessfully(ctx, event.Failure, closeTime)
239+
// Attempts only execute in SCHEDULED, so that status identifies attempt-originated cancels.
240+
fromAttempt := o.GetStatus() == nexusoperationpb.OPERATION_STATUS_SCHEDULED
241+
return o.resolveUnsuccessfully(ctx, event.Failure, closeTime, fromAttempt)
238242
},
239243
)
240244

@@ -252,30 +256,27 @@ var TransitionTerminated = chasm.NewTransition(
252256
},
253257
nexusoperationpb.OPERATION_STATUS_TERMINATED,
254258
func(o *Operation, ctx chasm.MutableContext, event EventTerminated) error {
259+
closeTime := ctx.Now(o)
255260
o.TerminateState = &nexusoperationpb.NexusOperationTerminateState{
256261
RequestId: event.RequestID,
257262
}
258-
o.ClosedTime = timestamppb.New(ctx.Now(o))
259-
o.NextAttemptScheduleTime = nil
260-
outcome := o.Outcome.Get(ctx)
261-
outcome.Variant = &nexusoperationpb.OperationOutcome_Failed_{
262-
Failed: &nexusoperationpb.OperationOutcome_Failed{
263-
Failure: &failurepb.Failure{
264-
Message: event.Reason,
265-
FailureInfo: &failurepb.Failure_TerminatedFailureInfo{
266-
TerminatedFailureInfo: &failurepb.TerminatedFailureInfo{
267-
Identity: event.Identity,
268-
},
269-
},
263+
failure := &failurepb.Failure{
264+
Message: event.Reason,
265+
FailureInfo: &failurepb.Failure_TerminatedFailureInfo{
266+
TerminatedFailureInfo: &failurepb.TerminatedFailureInfo{
267+
Identity: event.Identity,
270268
},
271269
},
272270
}
273-
return nil
271+
return o.resolveUnsuccessfully(ctx, failure, closeTime, false)
274272
},
275273
)
276274

277275
// EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.
278276
type EventTimedOut struct {
277+
Failure *failurepb.Failure
278+
// FromAttempt is true when the failure came from an invocation attempt.
279+
FromAttempt bool
279280
}
280281

281282
var TransitionTimedOut = chasm.NewTransition(
@@ -286,11 +287,6 @@ var TransitionTimedOut = chasm.NewTransition(
286287
},
287288
nexusoperationpb.OPERATION_STATUS_TIMED_OUT,
288289
func(o *Operation, ctx chasm.MutableContext, event EventTimedOut) error {
289-
// Clear the next attempt schedule time when leaving BACKING_OFF state. This field is only valid in
290-
// BACKING_OFF state.
291-
o.NextAttemptScheduleTime = nil
292-
o.ClosedTime = timestamppb.New(ctx.Now(o))
293-
// Terminal state - no tasks to emit.
294-
return nil
290+
return o.resolveUnsuccessfully(ctx, event.Failure, ctx.Now(o), event.FromAttempt)
295291
},
296292
)

0 commit comments

Comments
 (0)