From d7baa51e8327ea5f4db7904dc1bfd8aeb3b4908e Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Thu, 4 Dec 2025 11:51:14 -0500 Subject: [PATCH 01/11] add failing test --- crates/common/src/protos/canned_histories.rs | 21 +++++- .../include/temporal-sdk-core-c-bridge.h | 30 ++++---- .../workflow_tests/local_activities.rs | 68 +++++++++++++++++++ 3 files changed, 103 insertions(+), 16 deletions(-) diff --git a/crates/common/src/protos/canned_histories.rs b/crates/common/src/protos/canned_histories.rs index 1ca4901f9..4335e11da 100644 --- a/crates/common/src/protos/canned_histories.rs +++ b/crates/common/src/protos/canned_histories.rs @@ -1324,7 +1324,8 @@ pub fn single_child_workflow_try_cancelled(child_wf_id: &str) -> TestHistoryBuil /// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED /// 5: EVENT_TYPE_MARKER_RECORDED (la result) /// 7: EVENT_TYPE_MARKER_RECORDED (la result) -/// 8: EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED +/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -1340,6 +1341,24 @@ pub fn two_local_activities_one_wft(parallel: bool) -> TestHistoryBuilder { t } +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_MARKER_RECORDED (LA 2 result) +/// 7: EVENT_TYPE_MARKER_RECORDED (LA 1 result) +/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn parallel_las_job_order_hist() -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_local_activity_result_marker(2, "2", b"hi2".into()); + t.add_local_activity_result_marker(1, "1", b"hi1".into()); + t.add_workflow_task_scheduled_and_started(); + t +} + /// Useful for one-of needs to write a crafted history to a file. Writes it as serialized proto /// binary to the provided path. pub fn write_hist_to_binfile( diff --git a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h index 25c38f946..80e997027 100644 --- a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -5,13 +5,13 @@ #include #include -typedef enum TemporalCoreRpcService { - Workflow = 1, - Operator, - Cloud, - Test, - Health, -} TemporalCoreRpcService; +typedef enum TemporalCoreForwardedLogLevel { + Trace = 0, + Debug, + Info, + Warn, + Error, +} TemporalCoreForwardedLogLevel; typedef enum TemporalCoreMetricAttributeValueType { String = 1, @@ -29,14 +29,6 @@ typedef enum TemporalCoreMetricKind { GaugeFloat, } TemporalCoreMetricKind; -typedef enum TemporalCoreForwardedLogLevel { - Trace = 0, - Debug, - Info, - Warn, - Error, -} TemporalCoreForwardedLogLevel; - typedef enum TemporalCoreOpenTelemetryMetricTemporality { Cumulative = 1, Delta, @@ -47,6 +39,14 @@ typedef enum TemporalCoreOpenTelemetryProtocol { Http, } TemporalCoreOpenTelemetryProtocol; +typedef enum TemporalCoreRpcService { + Workflow = 1, + Operator, + Cloud, + Test, + Health, +} TemporalCoreRpcService; + typedef enum TemporalCoreSlotKindType { WorkflowSlotKindType, ActivitySlotKindType, diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs index 2dff951a0..479cd799a 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs @@ -3074,6 +3074,74 @@ async fn las_separated_by_timer(#[case] replay: bool) { worker.run().await.unwrap(); } +#[workflow] +#[derive(Default)] +struct ParallelLasJobOrderWf; + +#[workflow_methods] +impl ParallelLasJobOrderWf { + #[run(name = DEFAULT_WORKFLOW_TYPE)] + async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { + let _ = temporalio_sdk::workflows::join!( + ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default()), + ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default()) + ); + Ok(()) + } +} + +#[rstest] +#[tokio::test] +async fn parallel_las_job_order(#[values(true, false)] replay: bool) { + let t = canned_histories::parallel_las_job_order_hist(); + let mut mock_cfg = if replay { + MockPollCfg::from_resps(t, [ResponseType::AllHistory]) + } else { + MockPollCfg::from_hist_builder(t) + }; + + let mut aai = ActivationAssertionsInterceptor::default(); + // Verify ResolveActivity jobs are received in completion order (seq 2 first, then seq 1) + // This catches the bug where they might be sent in request order instead + aai.skip_one().then(move |a| { + assert_matches!( + a.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra1)) + }, WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra2)) + }] => {assert_eq!(ra1.seq, 2); assert_eq!(ra2.seq, 1)} + ); + }); + + mock_cfg.completion_asserts_from_expectations(|mut asserts| { + asserts.then(move |wft| { + let commands = &wft.commands; + if !replay { + assert_eq!(commands.len(), 3); + assert_eq!(commands[0].command_type(), CommandType::RecordMarker); + assert_eq!(commands[1].command_type(), CommandType::RecordMarker); + assert_matches!( + commands[2].command_type(), + CommandType::CompleteWorkflowExecution + ); + } else { + assert_eq!(commands.len(), 1); + assert_matches!( + commands[0].command_type(), + CommandType::CompleteWorkflowExecution + ); + } + }); + }); + + let mut worker = build_fake_sdk(mock_cfg); + worker.set_worker_interceptor(aai); + worker.register_workflow::(); + worker.register_activities(ResolvedActivity); + worker.run().await.unwrap(); +} + #[tokio::test] async fn one_la_heartbeating_wft_failure_still_executes() { let mut t = TestHistoryBuilder::default(); From ac4ddeba6bb7b8c787ba7b00022a76d8c04f023e Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 09:42:24 -0500 Subject: [PATCH 02/11] initial fix --- .../machines/local_activity_state_machine.rs | 169 ++++++++++-------- .../workflow/machines/workflow_machines.rs | 26 ++- .../machines/workflow_machines/local_acts.rs | 20 ++- .../src/worker/workflow/managed_run.rs | 8 + 4 files changed, 140 insertions(+), 83 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 6a12e11dd..48f639d1c 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -48,8 +48,7 @@ fsm! { // is replaying), and then immediately scheduled and transitions to either requesting that lang // execute the activity, or waiting for the marker from history. Executing --(Schedule, shared on_schedule) --> RequestSent; - Replaying --(Schedule, on_schedule) --> WaitingMarkerEvent; - ReplayingPreResolved --(Schedule, on_schedule) --> WaitingMarkerEventPreResolved; + Replaying --(Schedule, on_schedule) --> WaitingResolveFromMarkerLookAhead; // Execution path ============================================================================= RequestSent --(HandleResult(ResolveDat), on_handle_result) --> MarkerCommandCreated; @@ -66,32 +65,29 @@ fsm! { --> MarkerCommandRecorded; // Replay path ================================================================================ - // LAs on the replay path always need to eventually see the marker - WaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) - --> MarkerCommandRecorded; + //WaitingResolveFromMarkerLookAhead --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) --> MarkerCommandRecorded; + WaitingResolveFromMarkerLookAhead --(HandleKnownResult(ResolveDat), on_handle_result) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; // If we are told to cancel while waiting for the marker, we still need to wait for the marker. - WaitingMarkerEvent --(Cancel, on_cancel_requested) --> WaitingMarkerEvent; + WaitingResolveFromMarkerLookAhead --(Cancel, on_cancel_requested) --> WaitingResolveFromMarkerLookAhead; + ResolvedFromMarkerLookAheadWaitingMarkerEvent --(Cancel, on_cancel_requested) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; + // Because there could be non-heartbeat WFTs (ex: signals being received) between scheduling // the LA and the marker being recorded, peekahead might not always resolve the LA *before* // scheduling it. This transition accounts for that. - WaitingMarkerEvent --(HandleKnownResult(ResolveDat), on_handle_result) --> WaitingMarkerEvent; - WaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType), - on_no_wait_cancel) --> WaitingMarkerEvent; + WaitingResolveFromMarkerLookAhead --(NoWaitCancel(ActivityCancellationType), + on_no_wait_cancel) --> WaitingResolveFromMarkerLookAhead; + ResolvedFromMarkerLookAheadWaitingMarkerEvent --(NoWaitCancel(ActivityCancellationType), + on_no_wait_cancel) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; + + // LAs on the replay path always need to eventually see the marker + ResolvedFromMarkerLookAheadWaitingMarkerEvent --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) + --> MarkerCommandRecorded; // It is entirely possible to have started the LA while replaying, only to find that we have // reached a new WFT and there still was no marker. In such cases we need to execute the LA. // This can easily happen if upon first execution, the worker does WFT heartbeating but then // dies for some reason. - WaitingMarkerEvent --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent; - - // If the activity is pre resolved we still expect to see marker recorded event at some point, - // even though we already resolved the activity. - WaitingMarkerEventPreResolved --(MarkerRecorded(CompleteLocalActivityData), - shared on_marker_recorded) --> MarkerCommandRecorded; - // Ignore cancellations when waiting for the marker after being pre-resolved - WaitingMarkerEventPreResolved --(Cancel) --> WaitingMarkerEventPreResolved; - WaitingMarkerEventPreResolved --(NoWaitCancel(ActivityCancellationType)) - --> WaitingMarkerEventPreResolved; + WaitingResolveFromMarkerLookAhead --(StartedNonReplayWFT, shared on_started_non_replay_wft) --> RequestSent; // Ignore cancellation in final state MarkerCommandRecorded --(Cancel, on_cancel_requested) --> MarkerCommandRecorded; @@ -101,6 +97,7 @@ fsm! { // LAs reporting status after they've handled their result can simply be ignored. We could // optimize this away higher up but that feels very overkill. MarkerCommandCreated --(HandleResult(ResolveDat)) --> MarkerCommandCreated; + MarkerCommandCreated --(HandleKnownResult(ResolveDat)) --> MarkerCommandCreated; ResultNotified --(HandleResult(ResolveDat)) --> ResultNotified; MarkerCommandRecorded --(HandleResult(ResolveDat)) --> MarkerCommandRecorded; } @@ -151,22 +148,12 @@ impl From for ResolveDat { pub(super) fn new_local_activity( mut attrs: ValidScheduleLA, replaying_when_invoked: bool, - maybe_pre_resolved: Option, wf_time: Option, internal_flags: InternalFlagsRef, ) -> Result<(LocalActivityMachine, Vec), WFMachinesError> { let initial_state = if replaying_when_invoked { - if let Some(dat) = maybe_pre_resolved { - ReplayingPreResolved { dat }.into() - } else { - Replaying {}.into() - } + Replaying {}.into() } else { - if maybe_pre_resolved.is_some() { - return Err(nondeterminism!( - "Local activity cannot be created as pre-resolved while not replaying" - )); - } Executing {}.into() }; @@ -202,15 +189,13 @@ impl LocalActivityMachine { /// command-event processing - instead simply applying the event to this machine and then /// skipping over the rest. If this machine is in the `ResultNotified` state, that means /// command handling should proceed as normal (ie: The command needs to be matched and removed). - /// The other valid states to make this check in are the `WaitingMarkerEvent[PreResolved]` - /// states, which will return true. /// /// Attempting the check in any other state likely means a bug in the SDK. pub(super) fn marker_should_get_special_handling(&self) -> Result { match self.state() { LocalActivityMachineState::ResultNotified(_) => Ok(false), - LocalActivityMachineState::WaitingMarkerEvent(_) => Ok(true), - LocalActivityMachineState::WaitingMarkerEventPreResolved(_) => Ok(true), + LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) => Ok(true), + LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) => Ok(true), _ => Err(fatal!( "Attempted to check for LA marker handling in invalid state {}", self.state() @@ -223,7 +208,7 @@ impl LocalActivityMachine { pub(super) fn will_accept_resolve_marker(&self) -> bool { matches!( self.state(), - LocalActivityMachineState::WaitingMarkerEvent(_) + LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) ) } @@ -231,10 +216,21 @@ impl LocalActivityMachine { pub(super) fn encountered_non_replay_wft( &mut self, ) -> Result, WFMachinesError> { + if matches!( + self.state(), + LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) + ) { + panic!( + "Invalid transition while notifying local activity (seq {}) of non-replay-wft-started in {}", + self.shared_state.attrs.seq, + self.state(), + ); + } + // This only applies to the waiting-for-marker state. It can safely be ignored in the others if !matches!( self.state(), - LocalActivityMachineState::WaitingMarkerEvent(_) + LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) ) { return Ok(vec![]); } @@ -448,31 +444,10 @@ impl MarkerCommandRecorded { #[derive(Default, Clone)] pub(super) struct Replaying {} impl Replaying { - pub(super) fn on_schedule(self) -> LocalActivityMachineTransition { - TransitionResult::ok( - [], - WaitingMarkerEvent { - already_resolved: false, - }, - ) - } -} - -#[derive(Clone)] -pub(super) struct ReplayingPreResolved { - dat: ResolveDat, -} -impl ReplayingPreResolved { pub(super) fn on_schedule( self, - ) -> LocalActivityMachineTransition { - TransitionResult::ok( - [ - LocalActivityCommand::FakeMarker, - LocalActivityCommand::Resolved(self.dat), - ], - WaitingMarkerEventPreResolved {}, - ) + ) -> LocalActivityMachineTransition { + TransitionResult::ok([], WaitingResolveFromMarkerLookAhead {}) } } @@ -559,11 +534,11 @@ impl ResultNotified { } #[derive(Default, Clone)] -pub(super) struct WaitingMarkerEvent { - already_resolved: bool, -} +pub(super) struct WaitingResolveFromMarkerLookAhead {} -impl WaitingMarkerEvent { +impl WaitingResolveFromMarkerLookAhead { + // FIXME(JWH): I dont think this transition exists any longer. + /* pub(super) fn on_marker_recorded( self, shared: &mut SharedState, @@ -572,22 +547,21 @@ impl WaitingMarkerEvent { verify_marker_dat!( shared, &dat, - TransitionResult::commands(if self.already_resolved { - vec![] - } else { - vec![LocalActivityCommand::Resolved(dat.into())] - }) + TransitionResult::commands(vec![LocalActivityCommand::Resolved(dat.into())]) ) } + */ + fn on_handle_result( self, dat: ResolveDat, - ) -> LocalActivityMachineTransition { + ) -> LocalActivityMachineTransition { TransitionResult::ok( - [LocalActivityCommand::Resolved(dat)], - WaitingMarkerEvent { - already_resolved: true, - }, + [ + // LocalActivityCommand::FakeMarker, + LocalActivityCommand::Resolved(dat), + ], + ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, ) } pub(super) fn on_started_non_replay_wft( @@ -601,7 +575,9 @@ impl WaitingMarkerEvent { )]) } - fn on_cancel_requested(self) -> LocalActivityMachineTransition { + fn on_cancel_requested( + self, + ) -> LocalActivityMachineTransition { // We still "request a cancel" even though we know the local activity should not be running // because the data might be in the pre-resolved list. TransitionResult::ok([LocalActivityCommand::RequestCancel], self) @@ -610,7 +586,7 @@ impl WaitingMarkerEvent { fn on_no_wait_cancel( self, _: ActivityCancellationType, - ) -> LocalActivityMachineTransition { + ) -> LocalActivityMachineTransition { // Markers are always recorded when cancelling, so this is the same as a normal cancel on // the replay path self.on_cancel_requested() @@ -618,14 +594,51 @@ impl WaitingMarkerEvent { } #[derive(Default, Clone)] -pub(super) struct WaitingMarkerEventPreResolved {} -impl WaitingMarkerEventPreResolved { +pub(super) struct ResolvedFromMarkerLookAheadWaitingMarkerEvent {} + +impl ResolvedFromMarkerLookAheadWaitingMarkerEvent { pub(super) fn on_marker_recorded( self, shared: &mut SharedState, dat: CompleteLocalActivityData, ) -> LocalActivityMachineTransition { - verify_marker_dat!(shared, &dat, TransitionResult::default()) + verify_marker_dat!(shared, &dat, TransitionResult::commands(vec![])) + } + // fn on_handle_result( + // self, + // dat: ResolveDat, + // ) -> LocalActivityMachineTransition { + // TransitionResult::ok( + // [LocalActivityCommand::Resolved(dat)], + // ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, + // ) + // } + // pub(super) fn on_started_non_replay_wft( + // self, + // dat: &mut SharedState, + // ) -> LocalActivityMachineTransition { + // // We aren't really "replaying" anymore for our purposes, and want to record the marker. + // dat.replaying_when_invoked = false; + // TransitionResult::commands([LocalActivityCommand::RequestActivityExecution( + // dat.attrs.clone(), + // )]) + // } + + fn on_cancel_requested( + self, + ) -> LocalActivityMachineTransition { + // We still "request a cancel" even though we know the local activity should not be running + // because the data might be in the pre-resolved list. + TransitionResult::ok([LocalActivityCommand::RequestCancel], self) + } + + fn on_no_wait_cancel( + self, + _: ActivityCancellationType, + ) -> LocalActivityMachineTransition { + // Markers are always recorded when cancelling, so this is the same as a normal cancel on + // the replay path + self.on_cancel_requested() } } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index cdc2c93f5..c7ffb1f3e 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -540,6 +540,7 @@ impl WorkflowMachines { pub(crate) fn iterate_machines(&mut self) -> Result<()> { let results = self.drive_me.fetch_workflow_iteration_output(); self.handle_driven_results(results)?; + self.apply_local_action_peeked_resolutions()?; self.prepare_commands()?; if self.workflow_is_finished() && let Some(rt) = self.total_runtime() @@ -549,6 +550,25 @@ impl WorkflowMachines { Ok(()) } + fn apply_local_action_peeked_resolutions(&mut self) -> Result<()> { + while let Some(seq) = self.local_activity_data.peek_preresolution_seq() { + let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(seq)) else { + // If we haven't encountered the LA schedule yet, stop processing the + // preresolutions. + break; + }; + // Look to make this "safe" + let dat = self.local_activity_data.take_preresolution(seq).unwrap(); + if let Machines::LocalActivityMachine(lam) = self.machine_mut(mk) { + let resps = lam.try_resolve_with_dat(dat)?; + self.process_machine_responses(mk, resps)?; + } else { + panic!("Found non-LAM for LA command"); + } + } + Ok(()) + } + /// Returns true if machines are ready to apply the next WFT sequence, false if events will need /// to be fetched in order to create a complete update with the entire next WFT sequence. pub(crate) fn ready_to_apply_next_wft(&self) -> bool { @@ -816,6 +836,7 @@ impl WorkflowMachines { DelayedAction::WakeLa(mk, la_dat) => { let mach = self.machine_mut(mk); if let Machines::LocalActivityMachine(ref mut lam) = *mach { + // self.local_activity_data.insert_peeked_marker(*la_dat); if lam.will_accept_resolve_marker() { let resps = lam.try_resolve_with_dat((*la_dat).into())?; self.process_machine_responses(mk, resps)?; @@ -1373,13 +1394,16 @@ impl WorkflowMachines { let (la, mach_resp) = new_local_activity( attrs, self.replaying, - self.local_activity_data.take_preresolution(seq), self.current_wf_time, self.observed_internal_flags.clone(), )?; let machkey = self.all_machines.insert(la.into()); self.id_to_machine .insert(CommandID::LocalActivity(seq), machkey); + // This now only does additional things on execute + // Previously on resolved replay it would be processing + // [LocalActivityCommand::FakeMarker, LocalActivityCommand::Resolved(self.dat)] + // replay will alway exit in `WaitingResolveFromMarkerLookAhead` state self.process_machine_responses(machkey, mach_resp)?; } WFCommandVariant::RequestCancelActivity(attrs) => { diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs index bb24a1a92..e5d1b7a9e 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs @@ -4,7 +4,7 @@ use crate::{ worker::{ExecutingLAId, LocalActRequest, NewLocalAct}, }; use std::{ - collections::{HashMap, HashSet}, + collections::{HashSet, VecDeque}, time::SystemTime, }; use temporalio_common::protos::temporal::api::common::v1::WorkflowExecution; @@ -19,7 +19,7 @@ pub(super) struct LocalActivityData { executing: HashSet, /// Maps local activity sequence numbers to their resolutions as found when looking ahead at /// next WFT - preresolutions: HashMap, + preresolutions: VecDeque<(u32, ResolveDat)>, /// Set true if the workflow is terminating am_terminating: bool, } @@ -78,11 +78,23 @@ impl LocalActivityData { } pub(super) fn insert_peeked_marker(&mut self, dat: CompleteLocalActivityData) { - self.preresolutions.insert(dat.marker_dat.seq, dat.into()); + self.preresolutions + .push_back((dat.marker_dat.seq, dat.into())); } pub(super) fn take_preresolution(&mut self, seq: u32) -> Option { - self.preresolutions.remove(&seq) + let idx = self + .preresolutions + .iter() + .enumerate() + .find_map(|(ix, (s, _))| (*s == seq).then_some(ix))?; + let (_, dat) = self.preresolutions.remove(idx).unwrap(); + Some(dat) + } + + pub(super) fn peek_preresolution_seq(&self) -> Option { + let (seq, _) = self.preresolutions.front()?; + Some(*seq) } pub(super) fn remove_from_queue(&mut self, seq: u32) -> Option { diff --git a/crates/sdk-core/src/worker/workflow/managed_run.rs b/crates/sdk-core/src/worker/workflow/managed_run.rs index d42dcd693..f6c24db4c 100644 --- a/crates/sdk-core/src/worker/workflow/managed_run.rs +++ b/crates/sdk-core/src/worker/workflow/managed_run.rs @@ -726,6 +726,8 @@ impl ManagedRun { if let Some(update) = update_from_new_page { self.wfm.feed_history_from_new_page(update)?; } + // here the las will be in `WaitingResolveFromMarkerLookAhead` on replay and no + // presolutions. // Don't bother applying the next task if we're evicting at the end of this activation // or are otherwise broken. if !completion.activation_was_eviction && !self.am_broken { @@ -1424,7 +1426,13 @@ impl WorkflowManager { return Ok(()); } loop { + // On first WFT, this will look forward and peek the results from next WFT + // On the let consumed_events = self.machines.apply_next_wft_from_history()?; + // Now we have presolutions + // previously, this las would be in the + // + // Maybe we apply presolutions here? if consumed_events == 0 || !self.machines.replaying || self.machines.has_pending_jobs() { From a0f9ff4cc2195390542d46dae6f1eff76f6a114d Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 14:31:25 -0500 Subject: [PATCH 03/11] avoid saving unneeded peeked resolutions --- .../workflow/machines/local_activity_state_machine.rs | 1 - .../src/worker/workflow/machines/workflow_machines.rs | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 48f639d1c..35b050798 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -97,7 +97,6 @@ fsm! { // LAs reporting status after they've handled their result can simply be ignored. We could // optimize this away higher up but that feels very overkill. MarkerCommandCreated --(HandleResult(ResolveDat)) --> MarkerCommandCreated; - MarkerCommandCreated --(HandleKnownResult(ResolveDat)) --> MarkerCommandCreated; ResultNotified --(HandleResult(ResolveDat)) --> ResultNotified; MarkerCommandRecorded --(HandleResult(ResolveDat)) --> MarkerCommandRecorded; } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index c7ffb1f3e..3a3e204be 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -836,12 +836,16 @@ impl WorkflowMachines { DelayedAction::WakeLa(mk, la_dat) => { let mach = self.machine_mut(mk); if let Machines::LocalActivityMachine(ref mut lam) = *mach { - // self.local_activity_data.insert_peeked_marker(*la_dat); if lam.will_accept_resolve_marker() { let resps = lam.try_resolve_with_dat((*la_dat).into())?; self.process_machine_responses(mk, resps)?; } else { - self.local_activity_data.insert_peeked_marker(*la_dat); + // Since the LA machine exists, we have encountered the LA WF command. + // But since it will not accept a resolve marker, we have no use saving + // this marker. + // Previously, peeked markers were stored in a map, but since we are + // now storing in an ordered collection we no longer can store this + // unnecessary data as `apply_local_action_peeked_resolutions` will attempt to apply it. } } } From 2040383660bee3c6d696829b0c16c5735c058683 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 15:30:20 -0500 Subject: [PATCH 04/11] remove dead code, clean up comments --- .../machines/local_activity_state_machine.rs | 51 +------------------ .../workflow/machines/workflow_machines.rs | 25 ++++----- .../machines/workflow_machines/local_acts.rs | 9 ++-- 3 files changed, 14 insertions(+), 71 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 35b050798..1806c3b03 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -363,12 +363,6 @@ pub(super) enum LocalActivityCommand { RequestActivityExecution(ValidScheduleLA), #[display("Resolved")] Resolved(ResolveDat), - /// The fake marker is used to avoid special casing marker recorded event handling. - /// If we didn't have the fake marker, there would be no "outgoing command" to match - /// against the event. This way there is, but the command never will be issued to - /// server because it is understood to be meaningless. - #[display("FakeMarker")] - FakeMarker, /// Indicate we want to cancel an LA that is currently executing, or look up if we have /// processed a marker with resolution data since the machine was constructed. #[display("Cancel")] @@ -536,30 +530,12 @@ impl ResultNotified { pub(super) struct WaitingResolveFromMarkerLookAhead {} impl WaitingResolveFromMarkerLookAhead { - // FIXME(JWH): I dont think this transition exists any longer. - /* - pub(super) fn on_marker_recorded( - self, - shared: &mut SharedState, - dat: CompleteLocalActivityData, - ) -> LocalActivityMachineTransition { - verify_marker_dat!( - shared, - &dat, - TransitionResult::commands(vec![LocalActivityCommand::Resolved(dat.into())]) - ) - } - */ - fn on_handle_result( self, dat: ResolveDat, ) -> LocalActivityMachineTransition { TransitionResult::ok( - [ - // LocalActivityCommand::FakeMarker, - LocalActivityCommand::Resolved(dat), - ], + [LocalActivityCommand::Resolved(dat)], ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, ) } @@ -603,25 +579,6 @@ impl ResolvedFromMarkerLookAheadWaitingMarkerEvent { ) -> LocalActivityMachineTransition { verify_marker_dat!(shared, &dat, TransitionResult::commands(vec![])) } - // fn on_handle_result( - // self, - // dat: ResolveDat, - // ) -> LocalActivityMachineTransition { - // TransitionResult::ok( - // [LocalActivityCommand::Resolved(dat)], - // ResolvedFromMarkerLookAheadWaitingMarkerEvent {}, - // ) - // } - // pub(super) fn on_started_non_replay_wft( - // self, - // dat: &mut SharedState, - // ) -> LocalActivityMachineTransition { - // // We aren't really "replaying" anymore for our purposes, and want to record the marker. - // dat.replaying_when_invoked = false; - // TransitionResult::commands([LocalActivityCommand::RequestActivityExecution( - // dat.attrs.clone(), - // )]) - // } fn on_cancel_requested( self, @@ -792,12 +749,6 @@ impl WFMachinesAdapter for LocalActivityMachine { } Ok(responses) } - LocalActivityCommand::FakeMarker => { - // See docs for `FakeMarker` for more - Ok(vec![MachineResponse::IssueFakeLocalActivityMarker( - self.shared_state.attrs.seq, - )]) - } LocalActivityCommand::RequestCancel => { Ok(vec![MachineResponse::RequestCancelLocalActivity( self.shared_state.attrs.seq, diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 3a3e204be..15a45b7e2 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -181,8 +181,6 @@ struct CommandAndMachine { #[derive(Debug, derive_more::Display)] enum MachineAssociatedCommand { Real(Box), - #[display("FakeLocalActivityMarker({_0})")] - FakeLocalActivityMarker(u32), } #[derive(Debug, Clone, Copy)] @@ -209,8 +207,6 @@ pub(super) enum MachineResponse { /// collisions. #[display("NewCoreOriginatedCommand({_0:?})")] NewCoreOriginatedCommand(ProtoCmdAttrs), - #[display("IssueFakeLocalActivityMarker({_0})")] - IssueFakeLocalActivityMarker(u32), #[display("TriggerWFTaskStarted")] TriggerWFTaskStarted { task_started_event_id: i64, @@ -426,7 +422,6 @@ impl WorkflowMachines { if !self.machine(c.machine).is_final_state() { match &c.command { MachineAssociatedCommand::Real(cmd) => Some((**cmd).clone()), - MachineAssociatedCommand::FakeLocalActivityMarker(_) => None, } } else { None @@ -553,17 +548,22 @@ impl WorkflowMachines { fn apply_local_action_peeked_resolutions(&mut self) -> Result<()> { while let Some(seq) = self.local_activity_data.peek_preresolution_seq() { let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(seq)) else { - // If we haven't encountered the LA schedule yet, stop processing the + // If we haven't encountered the LA schedule for the first preresolution yet, stop processing the // preresolutions. break; }; - // Look to make this "safe" - let dat = self.local_activity_data.take_preresolution(seq).unwrap(); + let dat = self + .local_activity_data + .take_preresolution(seq) + .expect("This seq was just given by `peek_preresolution_seq`"); if let Machines::LocalActivityMachine(lam) = self.machine_mut(mk) { let resps = lam.try_resolve_with_dat(dat)?; self.process_machine_responses(mk, resps)?; } else { - panic!("Found non-LAM for LA command"); + return Err(fatal!( + "Peeked local activity marker but the associated machine was of the \ + wrong type! {dat:?}" + )); } } Ok(()) @@ -1164,7 +1164,6 @@ impl WorkflowMachines { .handle_command(cmd.command_type())?; self.process_machine_responses(c.machine, machine_responses)?; } - MachineAssociatedCommand::FakeLocalActivityMarker(_) => {} } self.commands.push_back(c); } @@ -1251,12 +1250,6 @@ impl WorkflowMachines { )); } }, - MachineResponse::IssueFakeLocalActivityMarker(seq) => { - self.current_wf_task_commands.push_back(CommandAndMachine { - command: MachineAssociatedCommand::FakeLocalActivityMarker(seq), - machine: smk, - }); - } MachineResponse::QueueLocalActivity(act) => { self.local_activity_data.enqueue(act); } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs index e5d1b7a9e..b1954226d 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines/local_acts.rs @@ -83,12 +83,11 @@ impl LocalActivityData { } pub(super) fn take_preresolution(&mut self, seq: u32) -> Option { - let idx = self + let idx = self.preresolutions.iter().position(|(s, _)| *s == seq)?; + let (_, dat) = self .preresolutions - .iter() - .enumerate() - .find_map(|(ix, (s, _))| (*s == seq).then_some(ix))?; - let (_, dat) = self.preresolutions.remove(idx).unwrap(); + .remove(idx) + .expect("This index was just found to contain seq"); Some(dat) } From 244deb3c35e3ec4077283525e5f7d38d1da50283 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 15:37:28 -0500 Subject: [PATCH 05/11] remove MachineAssociatedCommand --- .../workflow/machines/workflow_machines.rs | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 15a45b7e2..4c7b04bb5 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -174,15 +174,10 @@ pub(crate) struct WorkflowMachines { #[derive(Debug, derive_more::Display)] #[display("Cmd&Machine({command})")] struct CommandAndMachine { - command: MachineAssociatedCommand, + command: ProtoCommand, machine: MachineKey, } -#[derive(Debug, derive_more::Display)] -enum MachineAssociatedCommand { - Real(Box), -} - #[derive(Debug, Clone, Copy)] struct ChangeInfo { created_command: bool, @@ -420,9 +415,7 @@ impl WorkflowMachines { .write_all_known(); self.commands.iter().filter_map(|c| { if !self.machine(c.machine).is_final_state() { - match &c.command { - MachineAssociatedCommand::Real(cmd) => Some((**cmd).clone()), - } + Some(c.command.clone()) } else { None } @@ -1157,14 +1150,10 @@ impl WorkflowMachines { .machine(c.machine) .was_cancelled_before_sent_to_server() { - match &c.command { - MachineAssociatedCommand::Real(cmd) => { - let machine_responses = self - .machine_mut(c.machine) - .handle_command(cmd.command_type())?; - self.process_machine_responses(c.machine, machine_responses)?; - } - } + let machine_responses = self + .machine_mut(c.machine) + .handle_command(c.command.command_type())?; + self.process_machine_responses(c.machine, machine_responses)?; self.commands.push_back(c); } } @@ -1208,7 +1197,7 @@ impl WorkflowMachines { } MachineResponse::IssueNewCommand(c) => { self.current_wf_task_commands.push_back(CommandAndMachine { - command: MachineAssociatedCommand::Real(Box::new(c)), + command: c, machine: smk, }) } @@ -1644,7 +1633,7 @@ impl WorkflowMachines { user_metadata: metadata, }; CommandAndMachine { - command: MachineAssociatedCommand::Real(Box::new(cmd)), + command: cmd, machine: k, } } From e659d5a5284dfd9d350ed7d3d71015d1f5b6e381 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 9 Dec 2025 16:48:46 -0500 Subject: [PATCH 06/11] clean up comments --- .../machines/local_activity_state_machine.rs | 13 +------------ .../worker/workflow/machines/workflow_machines.rs | 4 ---- crates/sdk-core/src/worker/workflow/managed_run.rs | 8 -------- 3 files changed, 1 insertion(+), 24 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 1806c3b03..118b25828 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -215,17 +215,6 @@ impl LocalActivityMachine { pub(super) fn encountered_non_replay_wft( &mut self, ) -> Result, WFMachinesError> { - if matches!( - self.state(), - LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) - ) { - panic!( - "Invalid transition while notifying local activity (seq {}) of non-replay-wft-started in {}", - self.shared_state.attrs.seq, - self.state(), - ); - } - // This only applies to the waiting-for-marker state. It can safely be ignored in the others if !matches!( self.state(), @@ -577,7 +566,7 @@ impl ResolvedFromMarkerLookAheadWaitingMarkerEvent { shared: &mut SharedState, dat: CompleteLocalActivityData, ) -> LocalActivityMachineTransition { - verify_marker_dat!(shared, &dat, TransitionResult::commands(vec![])) + verify_marker_dat!(shared, &dat, TransitionResult::default()) } fn on_cancel_requested( diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 4c7b04bb5..dea84705d 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -1386,10 +1386,6 @@ impl WorkflowMachines { let machkey = self.all_machines.insert(la.into()); self.id_to_machine .insert(CommandID::LocalActivity(seq), machkey); - // This now only does additional things on execute - // Previously on resolved replay it would be processing - // [LocalActivityCommand::FakeMarker, LocalActivityCommand::Resolved(self.dat)] - // replay will alway exit in `WaitingResolveFromMarkerLookAhead` state self.process_machine_responses(machkey, mach_resp)?; } WFCommandVariant::RequestCancelActivity(attrs) => { diff --git a/crates/sdk-core/src/worker/workflow/managed_run.rs b/crates/sdk-core/src/worker/workflow/managed_run.rs index f6c24db4c..d42dcd693 100644 --- a/crates/sdk-core/src/worker/workflow/managed_run.rs +++ b/crates/sdk-core/src/worker/workflow/managed_run.rs @@ -726,8 +726,6 @@ impl ManagedRun { if let Some(update) = update_from_new_page { self.wfm.feed_history_from_new_page(update)?; } - // here the las will be in `WaitingResolveFromMarkerLookAhead` on replay and no - // presolutions. // Don't bother applying the next task if we're evicting at the end of this activation // or are otherwise broken. if !completion.activation_was_eviction && !self.am_broken { @@ -1426,13 +1424,7 @@ impl WorkflowManager { return Ok(()); } loop { - // On first WFT, this will look forward and peek the results from next WFT - // On the let consumed_events = self.machines.apply_next_wft_from_history()?; - // Now we have presolutions - // previously, this las would be in the - // - // Maybe we apply presolutions here? if consumed_events == 0 || !self.machines.replaying || self.machines.has_pending_jobs() { From 4cc9796a39a2f30819336a12fd5ecfeef5c6e941 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 10 Dec 2025 08:50:54 -0500 Subject: [PATCH 07/11] more comments, fix formatting --- .../include/temporal-sdk-core-c-bridge.h | 30 +++++------ .../machines/local_activity_state_machine.rs | 1 - .../workflow/machines/workflow_machines.rs | 50 ++++++++++--------- 3 files changed, 41 insertions(+), 40 deletions(-) diff --git a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h index 80e997027..25c38f946 100644 --- a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -5,13 +5,13 @@ #include #include -typedef enum TemporalCoreForwardedLogLevel { - Trace = 0, - Debug, - Info, - Warn, - Error, -} TemporalCoreForwardedLogLevel; +typedef enum TemporalCoreRpcService { + Workflow = 1, + Operator, + Cloud, + Test, + Health, +} TemporalCoreRpcService; typedef enum TemporalCoreMetricAttributeValueType { String = 1, @@ -29,6 +29,14 @@ typedef enum TemporalCoreMetricKind { GaugeFloat, } TemporalCoreMetricKind; +typedef enum TemporalCoreForwardedLogLevel { + Trace = 0, + Debug, + Info, + Warn, + Error, +} TemporalCoreForwardedLogLevel; + typedef enum TemporalCoreOpenTelemetryMetricTemporality { Cumulative = 1, Delta, @@ -39,14 +47,6 @@ typedef enum TemporalCoreOpenTelemetryProtocol { Http, } TemporalCoreOpenTelemetryProtocol; -typedef enum TemporalCoreRpcService { - Workflow = 1, - Operator, - Cloud, - Test, - Health, -} TemporalCoreRpcService; - typedef enum TemporalCoreSlotKindType { WorkflowSlotKindType, ActivitySlotKindType, diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index 118b25828..beab9bd0e 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -65,7 +65,6 @@ fsm! { --> MarkerCommandRecorded; // Replay path ================================================================================ - //WaitingResolveFromMarkerLookAhead --(MarkerRecorded(CompleteLocalActivityData), shared on_marker_recorded) --> MarkerCommandRecorded; WaitingResolveFromMarkerLookAhead --(HandleKnownResult(ResolveDat), on_handle_result) --> ResolvedFromMarkerLookAheadWaitingMarkerEvent; // If we are told to cancel while waiting for the marker, we still need to wait for the marker. WaitingResolveFromMarkerLookAhead --(Cancel, on_cancel_requested) --> WaitingResolveFromMarkerLookAhead; diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index dea84705d..9116b28e6 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -538,30 +538,6 @@ impl WorkflowMachines { Ok(()) } - fn apply_local_action_peeked_resolutions(&mut self) -> Result<()> { - while let Some(seq) = self.local_activity_data.peek_preresolution_seq() { - let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(seq)) else { - // If we haven't encountered the LA schedule for the first preresolution yet, stop processing the - // preresolutions. - break; - }; - let dat = self - .local_activity_data - .take_preresolution(seq) - .expect("This seq was just given by `peek_preresolution_seq`"); - if let Machines::LocalActivityMachine(lam) = self.machine_mut(mk) { - let resps = lam.try_resolve_with_dat(dat)?; - self.process_machine_responses(mk, resps)?; - } else { - return Err(fatal!( - "Peeked local activity marker but the associated machine was of the \ - wrong type! {dat:?}" - )); - } - } - Ok(()) - } - /// Returns true if machines are ready to apply the next WFT sequence, false if events will need /// to be fetched in order to create a complete update with the entire next WFT sequence. pub(crate) fn ready_to_apply_next_wft(&self) -> bool { @@ -1681,6 +1657,32 @@ impl WorkflowMachines { } } } + + // Applies local action preresolutions peeked from history until encountering a result for an + // LA that has yet to be scheduled. + fn apply_local_action_peeked_resolutions(&mut self) -> Result<()> { + while let Some(seq) = self.local_activity_data.peek_preresolution_seq() { + let Ok(mk) = self.get_machine_key(CommandID::LocalActivity(seq)) else { + // If we haven't encountered the LA schedule for the first preresolution yet, stop processing the + // preresolutions. + break; + }; + let dat = self + .local_activity_data + .take_preresolution(seq) + .expect("This seq was just given by `peek_preresolution_seq`"); + if let Machines::LocalActivityMachine(lam) = self.machine_mut(mk) { + let resps = lam.try_resolve_with_dat(dat)?; + self.process_machine_responses(mk, resps)?; + } else { + return Err(fatal!( + "Peeked local activity marker but the associated machine was of the \ + wrong type! {dat:?}" + )); + } + } + Ok(()) + } } /// Contains everything workflow machine internals need to bubble up when we're getting ready to From 18e19d74c883dfc44d0f7e224ece34d078a477ac Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 10 Dec 2025 10:12:34 -0500 Subject: [PATCH 08/11] remove special case check --- .../src/worker/workflow/machines/local_activity_state_machine.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs index beab9bd0e..a87fdc3a1 100644 --- a/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/crates/sdk-core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -192,7 +192,6 @@ impl LocalActivityMachine { pub(super) fn marker_should_get_special_handling(&self) -> Result { match self.state() { LocalActivityMachineState::ResultNotified(_) => Ok(false), - LocalActivityMachineState::WaitingResolveFromMarkerLookAhead(_) => Ok(true), LocalActivityMachineState::ResolvedFromMarkerLookAheadWaitingMarkerEvent(_) => Ok(true), _ => Err(fatal!( "Attempted to check for LA marker handling in invalid state {}", From b85167bb41e44a84fee4e669878268d0e20cf7ab Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 17 Dec 2025 10:40:36 -0500 Subject: [PATCH 09/11] make error nde --- .../sdk-core/src/worker/workflow/machines/workflow_machines.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 9116b28e6..9712f6765 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -1675,7 +1675,7 @@ impl WorkflowMachines { let resps = lam.try_resolve_with_dat(dat)?; self.process_machine_responses(mk, resps)?; } else { - return Err(fatal!( + return Err(nondeterminism!( "Peeked local activity marker but the associated machine was of the \ wrong type! {dat:?}" )); From 6e3ac66a61d45207d1176f58c353b888ef47352b Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Fri, 19 Dec 2025 09:51:30 -0500 Subject: [PATCH 10/11] additional tests --- .../workflow_tests/local_activities.rs | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs index 479cd799a..4beda5811 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs @@ -3385,3 +3385,178 @@ async fn cancel_after_act_starts_canned( }); worker.run().await.unwrap(); } + +// 2 LAs scheduled in same WFT, LA2 completes in the same WFT, LA1 completes in +// following WFT after heartbeat. +#[rstest] +#[tokio::test] +async fn mixed_la_completion_times(#[values(true, false)] replay: bool) { + let wft_timeout = Duration::from_millis(100); + let mut t = TestHistoryBuilder::default(); + // Short WFT timeout to trigger heartbeat behavior + t.add_wfe_started_with_wft_timeout(wft_timeout); + t.add_full_wf_task(); + // LA2 (fast, seq=2) completes in same WFT, marker recorded immediately + t.add_local_activity_result_marker(2, "2", b"Result".into()); + // Heartbeat WFT (because LA1 is still running) + t.add_full_wf_task(); + // LA1 (slow, seq=1) completes in this WFT + t.add_local_activity_result_marker(1, "1", b"Result".into()); + t.add_workflow_task_scheduled_and_started(); + + let wf_id = "fakeid"; + let mock = mock_worker_client(); + let mut mock_cfg = if replay { + MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::AllHistory], mock) + } else { + MockPollCfg::from_hist_builder(t) + }; + + let mut aai = ActivationAssertionsInterceptor::default(); + aai.skip_one() + .then(|a| { + assert_matches!( + a.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra)), + }] => assert_eq!(ra.seq, 2) + ); + }) + .then(|a| { + assert_matches!( + a.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra)), + }] => assert_eq!(ra.seq, 1) + ); + }); + + mock_cfg.completion_asserts_from_expectations(|mut asserts| { + if replay { + asserts.then(|wft| { + assert_eq!(wft.commands.len(), 1); + assert_eq!( + wft.commands[0].command_type(), + CommandType::CompleteWorkflowExecution + ); + }); + } else { + asserts + .then(|wft| { + assert_eq!(wft.commands.len(), 1); + assert_eq!(wft.commands[0].command_type(), CommandType::RecordMarker); + }) + .then(|wft| { + assert_eq!(wft.commands.len(), 2); + assert_eq!(wft.commands[0].command_type(), CommandType::RecordMarker); + assert_eq!( + wft.commands[1].command_type(), + CommandType::CompleteWorkflowExecution + ); + }); + } + }); + let mut worker = build_fake_sdk(mock_cfg); + worker.set_worker_interceptor(aai); + worker.register_workflow::(); + worker.register_activities(ResolvedActivity); + worker.run().await.unwrap(); +} + +/// Test: 2 LAs scheduled in same WFT, both complete in a different (later) WFT. +/// This tests marker lookahead when multiple markers need resolution after heartbeat. +/// +/// History structure: +/// ```text +/// WFT1: Schedule LA1 (seq=1) + LA2 (seq=2) +/// WFT2: Heartbeat WFT (both LAs still running) +/// LA1 marker recorded +/// LA2 marker recorded +/// WFT3: Workflow completes +/// ``` + +#[rstest] +#[tokio::test] +async fn two_las_with_heartbeat( + #[values(true, false)] replay: bool, + #[values(true, false)] complete_in_schedule_order: bool, +) { + let mut t = TestHistoryBuilder::default(); + t.add_wfe_started_with_wft_timeout(Duration::from_millis(100)); + t.add_full_wf_task(); + // Heartbeat before resolving LA + t.add_full_wf_task(); + if complete_in_schedule_order { + t.add_local_activity_result_marker(1, "1", b"hi".into()); + t.add_local_activity_result_marker(2, "2", b"hi".into()); + } else { + t.add_local_activity_result_marker(2, "2", b"hi".into()); + t.add_local_activity_result_marker(1, "1", b"hi".into()); + } + t.add_workflow_task_scheduled_and_started(); + let mut mock_cfg = if replay { + MockPollCfg::from_resps(t, [ResponseType::AllHistory]) + } else { + MockPollCfg::from_hist_builder(t) + }; + + let mut aai = ActivationAssertionsInterceptor::default(); + aai.skip_one().then(move |a| { + let (first_expected_seq, second_expected_seq) = if complete_in_schedule_order { + (1, 2) + } else { + (2, 1) + }; + // TODO: When actually executing these race depending on which future scheduling + // Can resolve in one or 2 activiations with different ordering. How do we fix this? + if replay { + assert_matches!( + a.jobs.as_slice(), + [ + WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra1)) + }, + WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::ResolveActivity(ra2)) + } + ] => { + assert_eq!(ra1.seq, first_expected_seq); + assert_eq!(ra2.seq, second_expected_seq); + } + ); + } + }); + + mock_cfg.completion_asserts_from_expectations(|mut asserts| { + if replay { + asserts.then(|wft| { + assert_eq!(wft.commands.len(), 1); + assert_eq!( + wft.commands[0].command_type(), + CommandType::CompleteWorkflowExecution + ); + }); + } else { + asserts + .then(|wft| { + assert_eq!(wft.commands.len(), 0); + }) + .then(|wft| { + assert_eq!(wft.commands.len(), 3); + assert_eq!(wft.commands[0].command_type(), CommandType::RecordMarker); + assert_eq!(wft.commands[1].command_type(), CommandType::RecordMarker); + assert_eq!( + wft.commands[2].command_type(), + CommandType::CompleteWorkflowExecution + ); + }); + } + }); + + let mut worker = build_fake_sdk(mock_cfg); + worker.set_worker_interceptor(aai); + worker.register_workflow::(); + worker.register_activities(ResolvedActivity); + worker.run().await.unwrap(); +} + From 2afa4af2b9a34a1af7b599c6e723c183ebf736c8 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Thu, 12 Mar 2026 12:35:08 -0400 Subject: [PATCH 11/11] fix test workflows to have correct ordering --- .../workflow_tests/local_activities.rs | 57 ++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs index 4beda5811..120769d0e 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/local_activities.rs @@ -3074,6 +3074,18 @@ async fn las_separated_by_timer(#[case] replay: bool) { worker.run().await.unwrap(); } +struct SleepActivity; + +#[activities] +impl SleepActivity { + #[allow(unused)] + #[activity(name = DEFAULT_ACTIVITY_TYPE)] + async fn sleep_ms(_ctx: ActivityContext, sleep_ms: u64) -> Result { + tokio::time::sleep(Duration::from_millis(sleep_ms)).await; + Ok("Resolved".to_string()) + } +} + #[workflow] #[derive(Default)] struct ParallelLasJobOrderWf; @@ -3083,8 +3095,16 @@ impl ParallelLasJobOrderWf { #[run(name = DEFAULT_WORKFLOW_TYPE)] async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { let _ = temporalio_sdk::workflows::join!( - ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default()), - ctx.start_local_activity(StdActivities::default, (), LocalActivityOptions::default()) + ctx.start_local_activity( + SleepActivity::sleep_ms, + 100u64, + LocalActivityOptions::default() + ), + ctx.start_local_activity( + SleepActivity::sleep_ms, + 1u64, + LocalActivityOptions::default() + ) ); Ok(()) } @@ -3138,7 +3158,7 @@ async fn parallel_las_job_order(#[values(true, false)] replay: bool) { let mut worker = build_fake_sdk(mock_cfg); worker.set_worker_interceptor(aai); worker.register_workflow::(); - worker.register_activities(ResolvedActivity); + worker.register_activities(SleepActivity); worker.run().await.unwrap(); } @@ -3458,8 +3478,8 @@ async fn mixed_la_completion_times(#[values(true, false)] replay: bool) { }); let mut worker = build_fake_sdk(mock_cfg); worker.set_worker_interceptor(aai); - worker.register_workflow::(); - worker.register_activities(ResolvedActivity); + worker.register_workflow::(); + worker.register_activities(SleepActivity); worker.run().await.unwrap(); } @@ -3553,10 +3573,33 @@ async fn two_las_with_heartbeat( } }); + let la_completion_barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2))); + + struct BarrierActivity { + barr: &'static Barrier, + wft_timeout: Duration, + } + #[activities] + impl BarrierActivity { + #[allow(unused)] + #[activity(name = DEFAULT_ACTIVITY_TYPE)] + async fn wait( + self: Arc, + _ctx: ActivityContext, + _: (), + ) -> Result { + tokio::time::sleep(self.wft_timeout.mul_f64(1.5)).await; + self.barr.wait().await; + Ok("hi".to_string()) + } + } + let mut worker = build_fake_sdk(mock_cfg); worker.set_worker_interceptor(aai); worker.register_workflow::(); - worker.register_activities(ResolvedActivity); + worker.register_activities(BarrierActivity { + barr: la_completion_barr, + wft_timeout: Duration::from_millis(100), + }); worker.run().await.unwrap(); } -