Skip to content

Commit bc0354e

Browse files
authored
time-skipping propagation (#10013)
## What changed and Why? Paired with [api change](https://github.com/temporalio/api/pull/770/changes) Define the default time-skipping propagation behavior for current features: continue-as-new (CAN), child workflows, retry, and reset. **Group 1 — Inherit both from the current execution:** - Continue-as-new (CAN): inherits the current config and accumulated skipped duration; the configured bound is shared across both the inherited skipped duration and any duration skipped by the new run. - Child workflows: same behavior as CAN This design is because CAN is a technical reason to start a new run of current run, so logically they can be viewed as a same "run". For Child WFs they can be viewed as an extension of previous workflows or separate workflows, and in either case, they shall inherit the skipped duration so that the virtual time doesn't rewind back, and the default behavior is designed to treat them as an extension of the parent and share the config. We only consider adding new config to provide flexibility on demand. **Group 2 — Inherit from a specific point in history:** - Retry: inherits the config and skipped duration recorded in the StartWorkflowExecutionEvent of the current workflow, since retry is defined as restarting execution from that event - Cron: same with Retry **Group3 -- Both inherit from a specific point and catch up of config change to current** - Reset: retains the current time-skipping config, since reset is designed to replay all events up to the reset point and apply any UpdateWorkflowExecutionOptions changes made after that point — with no option to exclude them (covered by tests only) ## How did you test it? - [x] built - [x] run locally and tested manually - [x] covered by existing tests - [x] added new unit test(s) - [x] added new functional test(s)
1 parent 5e825f1 commit bc0354e

18 files changed

Lines changed: 1765 additions & 362 deletions

api/historyservice/v1/request_response.pb.go

Lines changed: 271 additions & 258 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ require (
6363
go.opentelemetry.io/otel/sdk v1.43.0
6464
go.opentelemetry.io/otel/sdk/metric v1.43.0
6565
go.opentelemetry.io/otel/trace v1.43.0
66-
go.temporal.io/api v1.62.11
66+
go.temporal.io/api v1.62.12-0.20260424184119-9015efabce8d
6767
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2
6868
go.temporal.io/sdk v1.41.1
6969
go.uber.org/fx v1.24.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0 h1:R
469469
go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0/go.mod h1:I89cynRj8y+383o7tEQVg2SVA6SRgDVIouWPUVXjx0U=
470470
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0 h1:CQvJSldHRUN6Z8jsUeYv8J0lXRvygALXIzsmAeCcZE0=
471471
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0/go.mod h1:xSQ+mEfJe/GjK1LXEyVOoSI1N9JV9ZI923X5kup43W4=
472-
go.temporal.io/api v1.62.11 h1:MWDaooDvOJCIRb1atqeZX2ErDPNTsNc3/mMEVEvvaVU=
473-
go.temporal.io/api v1.62.11/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
472+
go.temporal.io/api v1.62.12-0.20260424184119-9015efabce8d h1:On7TmNeQ/mm1fxkXCn2Aqqf9Sy8GgcKPJUZunqA7Wpo=
473+
go.temporal.io/api v1.62.12-0.20260424184119-9015efabce8d/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
474474
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ=
475475
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50=
476476
go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA=

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ message StartWorkflowExecutionRequest {
104104
// directly from the started event. Written onto the new run's
105105
// WorkflowExecutionStartedEvent.
106106
temporal.api.history.v1.DeclinedTargetVersionUpgrade declined_target_version_upgrade = 17;
107+
108+
// If a workflow execution is started by a previous execution (parent-child workflow or continue-as-new)
109+
// that has already skipped some time, the accumulated skipped duration from that execution
110+
// can be passed to the new workflow execution through this field.
111+
google.protobuf.Duration initial_skipped_duration = 18;
107112
}
108113

109114
message StartWorkflowExecutionResponse {

service/frontend/workflow_handler.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -734,14 +734,6 @@ func (wh *WorkflowHandler) validateTimeSkippingConfig(
734734
namespace.MinTimeSkippingDuration,
735735
)
736736
}
737-
// todo: need to adapt the timeSource after time-skipping timeSource is implemented
738-
case *workflowpb.TimeSkippingConfig_MaxTargetTime:
739-
if bound.MaxTargetTime.AsTime().Before(wh.namespaceHandler.timeSource.Now().Add(namespace.MinTimeSkippingDuration)) {
740-
return serviceerror.NewInvalidArgumentf(
741-
"Max target time must be at least %s from current time of the workflow",
742-
namespace.MinTimeSkippingDuration,
743-
)
744-
}
745737
default:
746738
return serviceerror.NewInvalidArgumentf("unsupported time skipping bound type: %T", bound)
747739
}

service/frontend/workflow_handler_test.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3296,17 +3296,6 @@ func (s *WorkflowHandlerSuite) TestValidateTimeSkippingConfig() {
32963296
Bound: &workflowpb.TimeSkippingConfig_MaxElapsedDuration{MaxElapsedDuration: durationpb.New(namespace.MinTimeSkippingDuration)},
32973297
}, s.testNamespace))
32983298

3299-
// MaxTargetTime less than 1 minute from now is rejected
3300-
s.Require().ErrorAs(wh.validateTimeSkippingConfig(&workflowpb.TimeSkippingConfig{
3301-
Enabled: true,
3302-
Bound: &workflowpb.TimeSkippingConfig_MaxTargetTime{MaxTargetTime: timestamppb.New(time.Now().Add(halfMinDuration))},
3303-
}, s.testNamespace), &invalidArgumentErr)
3304-
3305-
// MaxTargetTime well beyond 1 minute from now is valid
3306-
s.Require().NoError(wh.validateTimeSkippingConfig(&workflowpb.TimeSkippingConfig{
3307-
Enabled: true,
3308-
Bound: &workflowpb.TimeSkippingConfig_MaxTargetTime{MaxTargetTime: timestamppb.New(time.Now().Add(2 * namespace.MinTimeSkippingDuration))},
3309-
}, s.testNamespace))
33103299
}
33113300

33123301
// TestExecuteMultiOperation_TimeSkipping_DCDisabled verifies that when the DC gate is off,

service/history/api/updateworkflowoptions/api.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -251,13 +251,6 @@ func mergeWorkflowExecutionOptions(
251251
mergeInto.TimeSkippingConfig.Enabled = mergeFrom.GetTimeSkippingConfig().GetEnabled()
252252
}
253253

254-
if _, ok := updateFields["timeSkippingConfig.disablePropagation"]; ok {
255-
if mergeInto.TimeSkippingConfig == nil {
256-
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
257-
}
258-
mergeInto.TimeSkippingConfig.DisablePropagation = mergeFrom.GetTimeSkippingConfig().GetDisablePropagation()
259-
}
260-
261254
if _, ok := updateFields["timeSkippingConfig.maxSkippedDuration"]; ok {
262255
if mergeInto.TimeSkippingConfig == nil {
263256
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
@@ -276,14 +269,5 @@ func mergeWorkflowExecutionOptions(
276269
}
277270
}
278271

279-
if _, ok := updateFields["timeSkippingConfig.maxTargetTime"]; ok {
280-
if mergeInto.TimeSkippingConfig == nil {
281-
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
282-
}
283-
mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxTargetTime{
284-
MaxTargetTime: mergeFrom.GetTimeSkippingConfig().GetMaxTargetTime(),
285-
}
286-
}
287-
288272
return mergeInto, nil
289273
}

service/history/api/updateworkflowoptions/api_test.go

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"google.golang.org/protobuf/proto"
3131
"google.golang.org/protobuf/types/known/durationpb"
3232
"google.golang.org/protobuf/types/known/fieldmaskpb"
33-
"google.golang.org/protobuf/types/known/timestamppb"
3433
)
3534

3635
type noopVersionCache struct{}
@@ -338,7 +337,6 @@ func TestMergeAndApply_TimeSkippingConfig(t *testing.T) {
338337
oneHour := durationpb.New(time.Hour)
339338
twoHours := durationpb.New(2 * time.Hour)
340339
thirtyMin := durationpb.New(30 * time.Minute)
341-
targetTime := timestamppb.New(time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC))
342340

343341
testCases := []struct {
344342
name string
@@ -348,10 +346,9 @@ func TestMergeAndApply_TimeSkippingConfig(t *testing.T) {
348346
expectedConfig *workflowpb.TimeSkippingConfig
349347
}{
350348
{
351-
name: "update max_skipped_duration preserves enabled and disable_propagation",
349+
name: "update max_skipped_duration preserves enabled",
352350
initialConfig: &workflowpb.TimeSkippingConfig{
353-
Enabled: true,
354-
DisablePropagation: true,
351+
Enabled: true,
355352
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
356353
MaxSkippedDuration: oneHour,
357354
},
@@ -365,8 +362,7 @@ func TestMergeAndApply_TimeSkippingConfig(t *testing.T) {
365362
},
366363
updateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config.max_skipped_duration"}},
367364
expectedConfig: &workflowpb.TimeSkippingConfig{
368-
Enabled: true,
369-
DisablePropagation: true,
365+
Enabled: true,
370366
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
371367
MaxSkippedDuration: twoHours,
372368
},
@@ -413,29 +409,6 @@ func TestMergeAndApply_TimeSkippingConfig(t *testing.T) {
413409
updateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config.enabled"}},
414410
expectedConfig: &workflowpb.TimeSkippingConfig{Enabled: false},
415411
},
416-
{
417-
name: "change bound type to max_target_time preserves enabled",
418-
initialConfig: &workflowpb.TimeSkippingConfig{
419-
Enabled: true,
420-
Bound: &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
421-
MaxSkippedDuration: oneHour,
422-
},
423-
},
424-
updateOptions: &workflowpb.WorkflowExecutionOptions{
425-
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{
426-
Bound: &workflowpb.TimeSkippingConfig_MaxTargetTime{
427-
MaxTargetTime: targetTime,
428-
},
429-
},
430-
},
431-
updateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config.max_target_time"}},
432-
expectedConfig: &workflowpb.TimeSkippingConfig{
433-
Enabled: true,
434-
Bound: &workflowpb.TimeSkippingConfig_MaxTargetTime{
435-
MaxTargetTime: targetTime,
436-
},
437-
},
438-
},
439412
}
440413

441414
for _, tc := range testCases {

service/history/historybuilder/event_factory.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ func (b *EventFactory) CreateWorkflowExecutionStartedEvent(
8888
if req.TimeSkippingConfig != nil {
8989
attributes.TimeSkippingConfig = req.TimeSkippingConfig
9090
}
91+
if request.GetInitialSkippedDuration() != nil {
92+
attributes.InitialSkippedDuration = request.GetInitialSkippedDuration()
93+
}
9194

9295
parentInfo := request.ParentExecutionInfo
9396
if parentInfo != nil {
@@ -840,6 +843,8 @@ func (b *EventFactory) CreateStartChildWorkflowExecutionInitiatedEvent(
840843
workflowTaskCompletedEventID int64,
841844
command *commandpb.StartChildWorkflowExecutionCommandAttributes,
842845
targetNamespaceID namespace.ID,
846+
timeSkippingConfig *workflowpb.TimeSkippingConfig,
847+
initialSkippedDuration *durationpb.Duration,
843848
) *historypb.HistoryEvent {
844849
event := b.createHistoryEvent(enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED, b.timeSource.Now())
845850
event.Attributes = &historypb.HistoryEvent_StartChildWorkflowExecutionInitiatedEventAttributes{
@@ -862,11 +867,13 @@ func (b *EventFactory) CreateStartChildWorkflowExecutionInitiatedEvent(
862867
// Filter nil values here rather than in the API layer because not all
863868
// creation paths go through the frontend (continue-as-new, child workflows, replication).
864869
// This CaN event is created on the parent workflow, so we need to filter nil values here.
865-
Memo: payload.FilterNilMemo(command.Memo),
866-
SearchAttributes: payload.FilterNilSearchAttributes(command.SearchAttributes),
867-
ParentClosePolicy: command.GetParentClosePolicy(),
868-
InheritBuildId: command.InheritBuildId, //nolint:staticcheck // SA1019: worker versioning v0.2
869-
Priority: command.Priority,
870+
Memo: payload.FilterNilMemo(command.Memo),
871+
SearchAttributes: payload.FilterNilSearchAttributes(command.SearchAttributes),
872+
ParentClosePolicy: command.GetParentClosePolicy(),
873+
InheritBuildId: command.InheritBuildId, //nolint:staticcheck // SA1019: worker versioning v0.2
874+
Priority: command.Priority,
875+
TimeSkippingConfig: timeSkippingConfig,
876+
InitialSkippedDuration: initialSkippedDuration,
870877
},
871878
}
872879
return event

service/history/historybuilder/history_builder.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,11 +770,15 @@ func (b *HistoryBuilder) AddStartChildWorkflowExecutionInitiatedEvent(
770770
workflowTaskCompletedEventID int64,
771771
command *commandpb.StartChildWorkflowExecutionCommandAttributes,
772772
targetNamespaceID namespace.ID,
773+
timeSkippingConfig *workflowpb.TimeSkippingConfig,
774+
initialSkippedDuration *durationpb.Duration,
773775
) *historypb.HistoryEvent {
774776
event := b.EventFactory.CreateStartChildWorkflowExecutionInitiatedEvent(
775777
workflowTaskCompletedEventID,
776778
command,
777779
targetNamespaceID,
780+
timeSkippingConfig,
781+
initialSkippedDuration,
778782
)
779783
event, _ = b.EventStore.add(event)
780784
return event

0 commit comments

Comments
 (0)