Skip to content

Commit 984be28

Browse files
committed
Propagate backlinks on Signal and Signal-with-Start responses
1 parent f9b2f6d commit 984be28

27 files changed

Lines changed: 482 additions & 241 deletions

api/historyservice/v1/request_response.pb.go

Lines changed: 190 additions & 169 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,3 +216,5 @@ require (
216216
modernc.org/mathutil v1.7.1 // indirect
217217
modernc.org/memory v1.11.0 // indirect
218218
)
219+
220+
replace go.temporal.io/api => github.com/long-nt-tran/api-go v0.0.0-20260413175257-fd07f9923cd0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
263263
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
264264
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
265265
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
266+
github.com/long-nt-tran/api-go v0.0.0-20260413175257-fd07f9923cd0 h1:mflAZ4Y+bO2+Nub/JSWq3lWnujm63XCvh6GwQvapKS0=
267+
github.com/long-nt-tran/api-go v0.0.0-20260413175257-fd07f9923cd0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
266268
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
267269
github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4=
268270
github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU=
@@ -440,8 +442,6 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY
440442
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
441443
go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOVAtj4=
442444
go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE=
443-
go.temporal.io/api v1.62.8 h1:g8RAZmdebYODoNa2GLA4M4TsXNe1096WV3n26C4+fdw=
444-
go.temporal.io/api v1.62.8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
445445
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ=
446446
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50=
447447
go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA=

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,9 @@ message SignalWorkflowExecutionRequest {
491491
bool child_workflow_only = 4;
492492
}
493493

494-
message SignalWorkflowExecutionResponse {}
494+
message SignalWorkflowExecutionResponse {
495+
temporal.api.common.v1.Link link = 1;
496+
}
495497

496498
message SignalWithStartWorkflowExecutionRequest {
497499
option (routing).workflow_id = "signal_with_start_request.workflow_id";
@@ -505,6 +507,7 @@ message SignalWithStartWorkflowExecutionRequest {
505507
message SignalWithStartWorkflowExecutionResponse {
506508
string run_id = 1;
507509
bool started = 2;
510+
temporal.api.common.v1.Link signal_link = 3;
508511
}
509512

510513
message RemoveSignalMutableStateRequest {

service/frontend/workflow_handler.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2250,15 +2250,17 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context, request
22502250
return nil, err
22512251
}
22522252

2253-
_, err = wh.historyClient.SignalWorkflowExecution(ctx, &historyservice.SignalWorkflowExecutionRequest{
2253+
resp, err := wh.historyClient.SignalWorkflowExecution(ctx, &historyservice.SignalWorkflowExecutionRequest{
22542254
NamespaceId: namespaceID.String(),
22552255
SignalRequest: request,
22562256
})
22572257
if err != nil {
22582258
return nil, err
22592259
}
22602260

2261-
return &workflowservice.SignalWorkflowExecutionResponse{}, nil
2261+
return &workflowservice.SignalWorkflowExecutionResponse{
2262+
Link: resp.GetLink(),
2263+
}, nil
22622264
}
22632265

22642266
// SignalWithStartWorkflowExecution is used to ensure sending signal to a workflow.
@@ -2372,8 +2374,9 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
23722374
}
23732375

23742376
return &workflowservice.SignalWithStartWorkflowExecutionResponse{
2375-
RunId: resp.GetRunId(),
2376-
Started: resp.Started,
2377+
RunId: resp.GetRunId(),
2378+
Started: resp.Started,
2379+
SignalLink: resp.GetSignalLink(),
23772380
}, nil
23782381
}
23792382

service/history/api/create_workflow_util.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func NewWorkflowWithSignal(
8181
signalWithStartRequest.GetSignalInput(),
8282
signalWithStartRequest.GetIdentity(),
8383
signalWithStartRequest.GetHeader(),
84+
signalWithStartRequest.GetRequestId(),
8485
signalWithStartRequest.GetLinks(),
8586
); err != nil {
8687
return nil, err

service/history/api/link_util.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package api
2+
3+
import (
4+
commonpb "go.temporal.io/api/common/v1"
5+
enumspb "go.temporal.io/api/enums/v1"
6+
"go.temporal.io/server/common"
7+
)
8+
9+
// GenerateStartedEventRefLink builds a Link pointing to the WORKFLOW_EXECUTION_STARTED event.
10+
// Use this for backlinks to workflow start: the started event is always EventId=1 (FirstEventID)
11+
// and is never buffered, so a concrete EventReference is appropriate.
12+
func GenerateStartedEventRefLink(namespace, workflowID, runID string) *commonpb.Link {
13+
return &commonpb.Link{
14+
Variant: &commonpb.Link_WorkflowEvent_{
15+
WorkflowEvent: &commonpb.Link_WorkflowEvent{
16+
Namespace: namespace,
17+
WorkflowId: workflowID,
18+
RunId: runID,
19+
Reference: &commonpb.Link_WorkflowEvent_EventRef{
20+
EventRef: &commonpb.Link_WorkflowEvent_EventReference{
21+
EventId: common.FirstEventID,
22+
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
23+
},
24+
},
25+
},
26+
},
27+
}
28+
}
29+
30+
// GenerateRequestIDRefLink builds a Link with a RequestIdReference.
31+
// Use this for events that are buffered at signal time (e.g. SIGNALED), where the
32+
// concrete EventId is not yet known. The server resolves the RequestId to a real
33+
// EventId once the buffer flushes.
34+
func GenerateRequestIDRefLink(namespace, workflowID, runID, requestID string, eventType enumspb.EventType) *commonpb.Link {
35+
return &commonpb.Link{
36+
Variant: &commonpb.Link_WorkflowEvent_{
37+
WorkflowEvent: &commonpb.Link_WorkflowEvent{
38+
Namespace: namespace,
39+
WorkflowId: workflowID,
40+
RunId: runID,
41+
Reference: &commonpb.Link_WorkflowEvent_RequestIdRef{
42+
RequestIdRef: &commonpb.Link_WorkflowEvent_RequestIdReference{
43+
RequestId: requestID,
44+
EventType: eventType,
45+
},
46+
},
47+
},
48+
},
49+
}
50+
}

service/history/api/multioperation/api.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"go.temporal.io/api/serviceerror"
1111
"go.temporal.io/server/api/historyservice/v1"
1212
"go.temporal.io/server/api/matchingservice/v1"
13-
"go.temporal.io/server/common"
1413
"go.temporal.io/server/common/definition"
1514
"go.temporal.io/server/common/locks"
1615
"go.temporal.io/server/common/namespace"
@@ -338,20 +337,7 @@ func (uws *updateWithStart) updateWorkflow(
338337
RunId: currentWorkflowLease.GetContext().GetWorkflowKey().RunID,
339338
Started: false, // set explicitly for emphasis
340339
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
341-
Link: &commonpb.Link{
342-
Variant: &commonpb.Link_WorkflowEvent_{
343-
WorkflowEvent: &commonpb.Link_WorkflowEvent{
344-
WorkflowId: wfKey.WorkflowID,
345-
RunId: wfKey.RunID,
346-
Reference: &commonpb.Link_WorkflowEvent_EventRef{
347-
EventRef: &commonpb.Link_WorkflowEvent_EventReference{
348-
EventId: common.FirstEventID,
349-
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
350-
},
351-
},
352-
},
353-
},
354-
},
340+
Link: api.GenerateStartedEventRefLink("", wfKey.WorkflowID, wfKey.RunID),
355341
}
356342

357343
return makeResponse(startResp, updateResp), nil

service/history/api/signalwithstartworkflow/api.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,16 @@ func Invoke(
8989
api.ReactivateVersionWorkflowIfPinned(ctx, namespaceEntry, request.GetVersioningOverride(), reactivationSignalCache, reactivationSignaler, shard.GetConfig().EnableVersionReactivationSignals())
9090
}
9191

92+
swr := signalWithStartRequest.SignalWithStartRequest
9293
return &historyservice.SignalWithStartWorkflowExecutionResponse{
9394
RunId: runID,
9495
Started: started,
96+
SignalLink: api.GenerateRequestIDRefLink(
97+
swr.GetNamespace(),
98+
swr.GetWorkflowId(),
99+
runID,
100+
swr.GetRequestId(),
101+
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED,
102+
),
95103
}, nil
96104
}

service/history/api/signalwithstartworkflow/signal_with_start_workflow.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ func signalWorkflow(
307307
request.GetSignalInput(),
308308
request.GetIdentity(),
309309
request.GetHeader(),
310+
request.GetRequestId(),
310311
request.GetLinks(),
311312
); err != nil {
312313
return err

0 commit comments

Comments
 (0)