Skip to content

Commit 46e0fa9

Browse files
committed
Nexus Standalone: wire up
1 parent 318bd03 commit 46e0fa9

12 files changed

Lines changed: 299 additions & 79 deletions

File tree

chasm/lib/nexusoperation/frontend.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,14 @@ func (h *frontendHandler) StartNexusOperationExecution(
8585
return nil, err
8686
}
8787

88-
// Verify the endpoint exists before creating the operation.
89-
if _, err := h.endpointRegistry.GetByName(ctx, namespaceID, req.GetEndpoint()); err != nil {
88+
endpoint, err := h.endpointRegistry.GetByName(ctx, namespaceID, req.GetEndpoint())
89+
if err != nil {
9090
return nil, err
9191
}
9292

9393
resp, err := h.client.StartNexusOperation(ctx, &nexusoperationpb.StartNexusOperationRequest{
9494
NamespaceId: namespaceID.String(),
95+
EndpointId: endpoint.Id,
9596
FrontendRequest: req,
9697
})
9798
return resp.GetFrontendResponse(), err

chasm/lib/nexusoperation/gen/nexusoperationpb/v1/operation.pb.go

Lines changed: 36 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/lib/nexusoperation/gen/nexusoperationpb/v1/request_response.pb.go

Lines changed: 14 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/lib/nexusoperation/operation.go

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.temporal.io/server/chasm"
1616
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
1717
"go.temporal.io/server/common/backoff"
18+
"go.temporal.io/server/common/softassert"
1819
queueserrors "go.temporal.io/server/service/history/queues/errors"
1920
"google.golang.org/protobuf/types/known/durationpb"
2021
"google.golang.org/protobuf/types/known/timestamppb"
@@ -74,6 +75,7 @@ func newStandaloneOperation(
7475
) (*Operation, error) {
7576
frontendReq := req.GetFrontendRequest()
7677
op := NewOperation(&nexusoperationpb.OperationState{
78+
EndpointId: req.GetEndpointId(),
7779
Endpoint: frontendReq.GetEndpoint(),
7880
Service: frontendReq.GetService(),
7981
Operation: frontendReq.GetOperation(),
@@ -156,41 +158,29 @@ func (o *Operation) onStarted(ctx chasm.MutableContext, operationToken string, l
156158
if store, ok := o.Store.TryGet(ctx); ok {
157159
return store.OnNexusOperationStarted(ctx, o, operationToken, links)
158160
}
161+
o.Links = append(o.Links, links...)
159162
return TransitionStarted.Apply(o, ctx, EventStarted{OperationToken: operationToken})
160163
}
161164

162165
func (o *Operation) onCompleted(ctx chasm.MutableContext, result *commonpb.Payload, links []*commonpb.Link) error {
163166
if store, ok := o.Store.TryGet(ctx); ok {
164167
return store.OnNexusOperationCompleted(ctx, o, result, links)
165168
}
166-
outcome := o.outcome(ctx)
167-
outcome.Variant = &nexusoperationpb.OperationOutcome_Successful_{
168-
Successful: &nexusoperationpb.OperationOutcome_Successful{Result: result},
169-
}
170-
return TransitionSucceeded.Apply(o, ctx, EventSucceeded{})
169+
o.Links = append(o.Links, links...)
170+
return TransitionSucceeded.Apply(o, ctx, EventSucceeded{Result: result})
171171
}
172172

173173
func (o *Operation) onFailed(ctx chasm.MutableContext, cause *failurepb.Failure) error {
174174
if store, ok := o.Store.TryGet(ctx); ok {
175175
return store.OnNexusOperationFailed(ctx, o, cause)
176176
}
177-
if cause != nil {
178-
o.outcome(ctx).Variant = &nexusoperationpb.OperationOutcome_Failed_{
179-
Failed: &nexusoperationpb.OperationOutcome_Failed{Failure: cause},
180-
}
181-
}
182177
return TransitionFailed.Apply(o, ctx, EventFailed{Failure: cause})
183178
}
184179

185180
func (o *Operation) onCanceled(ctx chasm.MutableContext, cause *failurepb.Failure) error {
186181
if store, ok := o.Store.TryGet(ctx); ok {
187182
return store.OnNexusOperationCanceled(ctx, o, cause)
188183
}
189-
if cause != nil {
190-
o.outcome(ctx).Variant = &nexusoperationpb.OperationOutcome_Failed_{
191-
Failed: &nexusoperationpb.OperationOutcome_Failed{Failure: cause},
192-
}
193-
}
194184
return TransitionCanceled.Apply(o, ctx, EventCanceled{Failure: cause})
195185
}
196186

@@ -389,14 +379,25 @@ func (o *Operation) buildPollResponse(
389379
}
390380

391381
func (o *Operation) describeOutcome(ctx chasm.Context) (*commonpb.Payload, *failurepb.Failure) {
392-
outcome := o.Outcome.Get(ctx)
393-
if successful := outcome.GetSuccessful(); successful != nil {
394-
return successful.GetResult(), nil
395-
}
396-
if failure := outcome.GetFailed().GetFailure(); failure != nil {
397-
return nil, failure
382+
outcome, hasOutcome := o.Outcome.TryGet(ctx)
383+
384+
switch {
385+
case !hasOutcome:
386+
return nil, o.LastAttemptFailure
387+
case outcome.GetSuccessful() != nil:
388+
return outcome.GetSuccessful().GetResult(), nil
389+
case outcome.GetFailed() != nil:
390+
// Timeouts often close after one or more retryable attempt failures. For describe/poll, prefer the
391+
// last concrete attempt failure over the generic timeout wrapper because it is more actionable.
392+
// Note that LastAttemptFailure may be nil if the operation timed out before any attempt.
393+
if o.Status == nexusoperationpb.OPERATION_STATUS_TIMED_OUT && o.LastAttemptFailure != nil {
394+
return nil, o.LastAttemptFailure
395+
}
396+
return nil, outcome.GetFailed().GetFailure()
397+
default:
398+
softassert.Fail(ctx.Logger(), "operation outcome has no variant set")
399+
return nil, nil
398400
}
399-
return nil, o.LastAttemptFailure
400401
}
401402

402403
func (o *Operation) isWaitStageReached(_ chasm.Context, waitStage enumspb.NexusOperationWaitStage) bool {
@@ -441,6 +442,7 @@ func (o *Operation) buildExecutionInfo(ctx chasm.Context) *nexuspb.NexusOperatio
441442
},
442443
NexusHeader: requestData.GetNexusHeader(),
443444
UserMetadata: requestData.GetUserMetadata(),
445+
Links: o.Links,
444446
Identity: requestData.GetIdentity(),
445447
}
446448

chasm/lib/nexusoperation/operation_statemachine.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package nexusoperation
33
import (
44
"time"
55

6+
commonpb "go.temporal.io/api/common/v1"
67
failurepb "go.temporal.io/api/failure/v1"
78
"go.temporal.io/server/chasm"
89
"go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
@@ -160,7 +161,7 @@ type EventSucceeded struct {
160161
// If not nil, uses the provided time instead of the current component time.
161162
// Used when a completion comes in before start is recorded (rare race).
162163
CompleteTime *time.Time
163-
// TODO(stephan): Store result
164+
Result *commonpb.Payload
164165
}
165166

166167
var TransitionSucceeded = chasm.NewTransition(
@@ -179,6 +180,15 @@ var TransitionSucceeded = chasm.NewTransition(
179180
}
180181
o.NextAttemptScheduleTime = nil
181182
o.ClosedTime = timestamppb.New(closeTime)
183+
184+
// Result is set by standalone operations. Workflow-attached operations store
185+
// the result in the history event and remove the operation after this transition.
186+
if event.Result != nil {
187+
o.outcome(ctx).Variant = &nexusoperationpb.OperationOutcome_Successful_{
188+
Successful: &nexusoperationpb.OperationOutcome_Successful{Result: event.Result},
189+
}
190+
}
191+
182192
// Terminal state - no tasks to emit.
183193
return nil
184194
},
@@ -252,8 +262,7 @@ var TransitionTerminated = chasm.NewTransition(
252262
}
253263
o.ClosedTime = timestamppb.New(ctx.Now(o))
254264
o.NextAttemptScheduleTime = nil
255-
outcome := o.Outcome.Get(ctx)
256-
outcome.Variant = &nexusoperationpb.OperationOutcome_Failed_{
265+
o.outcome(ctx).Variant = &nexusoperationpb.OperationOutcome_Failed_{
257266
Failed: &nexusoperationpb.OperationOutcome_Failed{
258267
Failure: &failurepb.Failure{
259268
Message: event.Reason,

chasm/lib/nexusoperation/operation_statemachine_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ var (
2222
)
2323

2424
func newTestOperation() *Operation {
25-
ctx := &chasm.MockMutableContext{}
2625
op := &Operation{
2726
OperationState: &nexusoperationpb.OperationState{
2827
Status: nexusoperationpb.OPERATION_STATUS_UNSPECIFIED,
@@ -36,7 +35,6 @@ func newTestOperation() *Operation {
3635
Attempt: 0,
3736
},
3837
}
39-
op.Outcome = chasm.NewDataField(ctx, &nexusoperationpb.OperationOutcome{})
4038
return op
4139
}
4240

chasm/lib/nexusoperation/operation_tasks.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@ func (h *operationInvocationTaskHandler) Execute(
169169
header.Set(nexusrpc.HeaderTemporalNexusFailureSupport, "true")
170170
}
171171

172+
var links []nexus.Link
173+
if args.nexusLink != (nexus.Link{}) {
174+
links = []nexus.Link{args.nexusLink}
175+
}
176+
172177
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
173178
defer cancel()
174179
// Set this value on the parent context so that our custom HTTP caller can mutate it since we cannot
@@ -182,7 +187,7 @@ func (h *operationInvocationTaskHandler) Execute(
182187
CallbackHeader: nexus.Header{
183188
commonnexus.CallbackTokenHeader: token,
184189
},
185-
Links: []nexus.Link{args.nexusLink},
190+
Links: links,
186191
}
187192

188193
invocation, err := h.newInvocation(callCtx, ns, endpoint, opRef, args, task, callTimeout, timeoutType)

0 commit comments

Comments
 (0)