From b6ea93f59543594a78b61cb80d1de154610e2618 Mon Sep 17 00:00:00 2001 From: emiling Date: Thu, 23 Apr 2026 01:24:13 +0900 Subject: [PATCH 1/6] temporalio/sdk-core#1213 add WorkflowHistoryStream for history event streaming --- crates/client/src/workflow_handle.rs | 29 +++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index 976dd07e7..f7e8f6fbb 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -8,7 +8,13 @@ use crate::{ }, grpc::WorkflowService, }; -use std::{fmt::Debug, marker::PhantomData}; +use futures_util::{ stream::Stream}; +use std::{ + fmt::Debug, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; pub use temporalio_common::UntypedWorkflow; use temporalio_common::{ HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition, @@ -303,6 +309,27 @@ impl WorkflowHistory { } } +/// A stream of history events from a workflow execution. +/// Internally paginates through results from the server. +pub struct WorkflowHistoryStream { + inner: Pin> + Send>>, +} + +impl WorkflowHistoryStream { + fn new( + inner: Pin> + Send>>, + ) -> Self { + Self { inner } + } +} + +impl Stream for WorkflowHistoryStream { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.as_mut().poll_next(cx) + } +} + /// A workflow handle which can refer to a specific workflow run, or a chain of workflow runs with /// the same workflow id. #[derive(Clone)] From 366f16c6dfd53d8678cb87ecebd6a1eb6577c34c Mon Sep 17 00:00:00 2001 From: emiling Date: Thu, 23 Apr 2026 01:36:17 +0900 Subject: [PATCH 2/6] temporalio/sdk-core#1213 add fetch_history_stream and helper for lazy history streaming --- crates/client/src/workflow_handle.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index f7e8f6fbb..a9bf0c4db 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -8,7 +8,7 @@ use crate::{ }, grpc::WorkflowService, }; -use futures_util::{ stream::Stream}; +use futures_util::stream::Stream; use std::{ fmt::Debug, marker::PhantomData, @@ -904,6 +904,18 @@ where self.fetch_history_for_run(&run_id, &opts).await } + /// Fetch workflow execution history as a lazy stream + pub fn fetch_history_stream( + &self, + opts: WorkflowFetchHistoryOptions, + ) -> WorkflowHistoryStream + where + CT: NamespacedClient, + { + let run_id = self.info.run_id.clone().unwrap_or_default(); + self.fetch_history_stream_for_run(&run_id, &opts) + } + /// Fetch history for a specific run_id, handling pagination. async fn fetch_history_for_run( &self, @@ -949,6 +961,15 @@ where Ok(WorkflowHistory::new(all_events)) } + + /// Fetch history for a specific run_id, handling pagination as a lazy stream + fn fetch_history_stream_for_run( + &self, + run_id: &str, + opts: &WorkflowFetchHistoryOptions, + ) -> WorkflowHistoryStream { + todo!() + } } /// Handle to a workflow update that has been started but may not be complete. From 121579c6816e892e881b86c9d8a9a0e310876854 Mon Sep 17 00:00:00 2001 From: emiling Date: Fri, 24 Apr 2026 22:49:46 +0900 Subject: [PATCH 3/6] temporalio/sdk-core#1213 implement streaming fetch for workflow execution history --- crates/client/src/workflow_handle.rs | 81 ++++++++++++++++++++++++++-- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index a9bf0c4db..631ff9ab4 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -8,8 +8,9 @@ use crate::{ }, grpc::WorkflowService, }; -use futures_util::stream::Stream; +use futures_util::{stream, stream::Stream}; use std::{ + collections::VecDeque, fmt::Debug, marker::PhantomData, pin::Pin, @@ -910,7 +911,7 @@ where opts: WorkflowFetchHistoryOptions, ) -> WorkflowHistoryStream where - CT: NamespacedClient, + CT: WorkflowService + NamespacedClient + 'static, { let run_id = self.info.run_id.clone().unwrap_or_default(); self.fetch_history_stream_for_run(&run_id, &opts) @@ -967,8 +968,80 @@ where &self, run_id: &str, opts: &WorkflowFetchHistoryOptions, - ) -> WorkflowHistoryStream { - todo!() + ) -> WorkflowHistoryStream + where + CT: WorkflowService + NamespacedClient + 'static, + { + let client = self.client.clone(); + let workflow_id = self.info.workflow_id.clone(); + let run_id = run_id.to_string(); + let opts = opts.clone(); + + // State: (next_page_token, buffer, yielded_count, exhausted) + let initial_state = (Vec::new(), VecDeque::new(), 0, false); + + let stream = stream::unfold( + initial_state, + move |(next_page_token, mut buffer, mut yielded, exhausted)| { + let mut client = client.clone(); + let namespace = client.namespace(); + let workflow_id = workflow_id.clone(); + let run_id = run_id.clone(); + let opts = opts.clone(); + + async move { + if let Some(exec) = buffer.pop_front() { + yielded += 1; + return Some((Ok(exec), (next_page_token, buffer, yielded, exhausted))); + } + + if exhausted { + return None; + } + + let response = WorkflowService::get_workflow_execution_history( + &mut client, + GetWorkflowExecutionHistoryRequest { + namespace, + execution: Some(ProtoWorkflowExecution { + workflow_id, + run_id, + }), + next_page_token: next_page_token.clone(), + skip_archival: opts.skip_archival, + wait_new_event: opts.wait_new_event, + history_event_filter_type: opts.event_filter_type as i32, + ..Default::default() + } + .into_request(), + ) + .await; + + match response { + Ok(resp) => { + let resp = resp.into_inner(); + let new_exhausted = resp.next_page_token.is_empty(); + let new_token = resp.next_page_token; + + if let Some(history) = resp.history { + buffer = history.events.into_iter().collect(); + } + + if let Some(event) = buffer.pop_front() { + Some((Ok(event), (new_token, buffer, yielded, new_exhausted))) + } else { + None + } + } + Err(e) => Some(( + Err(WorkflowInteractionError::from_status(e)), + (next_page_token, buffer, yielded, true), + )), + } + } + }, + ); + WorkflowHistoryStream::new(Box::pin(stream)) } } From d048ad2b2eff7f2ce3d558df33b69796e81ca972 Mon Sep 17 00:00:00 2001 From: emiling Date: Fri, 24 Apr 2026 22:50:51 +0900 Subject: [PATCH 4/6] temporalio/sdk-core#1213 remove todo comment for stream implementation --- crates/client/src/workflow_handle.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index 631ff9ab4..a9d9669d9 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -282,7 +282,6 @@ impl WorkflowExecutionDescription { } } -// TODO [rust-sdk-branch]: Could implment stream a-la ListWorkflowsStream /// Workflow execution history returned by `WorkflowHandle::fetch_history`. #[derive(Debug, Clone)] pub struct WorkflowHistory { From ac84eb68cd091dd0cc1d1d361b6480f07ef37aa1 Mon Sep 17 00:00:00 2001 From: emiling Date: Mon, 27 Apr 2026 23:29:27 +0900 Subject: [PATCH 5/6] temporalio/sdk-core#1213 make WorkflowHistory stream-based and retain into_events helper --- crates/client/src/workflow_handle.rs | 143 ++++++--------------------- 1 file changed, 29 insertions(+), 114 deletions(-) diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index a9d9669d9..55940828c 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -8,7 +8,7 @@ use crate::{ }, grpc::WorkflowService, }; -use futures_util::{stream, stream::Stream}; +use futures_util::{TryStreamExt, stream, stream::Stream}; use std::{ collections::VecDeque, fmt::Debug, @@ -31,10 +31,7 @@ use temporalio_common::{ common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution}, enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage}, failure::v1::Failure, - history::{ - self, - v1::{HistoryEvent, history_event::Attributes}, - }, + history::v1::{HistoryEvent, history_event::Attributes}, query::v1::WorkflowQuery, sdk::v1::UserMetadata, update::{self, v1::WaitPolicy}, @@ -282,48 +279,26 @@ impl WorkflowExecutionDescription { } } -/// Workflow execution history returned by `WorkflowHandle::fetch_history`. -#[derive(Debug, Clone)] +/// Workflow execution history, lazily fetched as a stream from the server. +/// Use `into_events` to collect all events at once, or consume as a `stream`. pub struct WorkflowHistory { - events: Vec, -} -impl From for history::v1::History { - fn from(h: WorkflowHistory) -> Self { - Self { events: h.events } - } -} - -impl WorkflowHistory { - fn new(events: Vec) -> Self { - Self { events } - } - - /// The history events. - pub fn events(&self) -> &[HistoryEvent] { - &self.events - } - - /// Consume the history and return the events. - pub fn into_events(self) -> Vec { - self.events - } -} - -/// A stream of history events from a workflow execution. -/// Internally paginates through results from the server. -pub struct WorkflowHistoryStream { inner: Pin> + Send>>, } -impl WorkflowHistoryStream { +impl WorkflowHistory { fn new( inner: Pin> + Send>>, ) -> Self { Self { inner } } + + /// Consume the stream and collect all events into a Vec + pub async fn into_events(self) -> Result, WorkflowInteractionError> { + self.inner.try_collect().await + } } -impl Stream for WorkflowHistoryStream { +impl Stream for WorkflowHistory { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner.as_mut().poll_next(cx) @@ -489,7 +464,7 @@ where opts: WorkflowGetResultOptions, ) -> Result where - CT: WorkflowService + NamespacedClient + Clone, + CT: WorkflowService + NamespacedClient + Clone + 'static, { let raw = self.get_result_raw(opts).await?; match raw { @@ -514,7 +489,7 @@ where opts: WorkflowGetResultOptions, ) -> Result, WorkflowInteractionError> where - CT: WorkflowService + NamespacedClient + Clone, + CT: WorkflowService + NamespacedClient + Clone + 'static, { let mut run_id = self.info.run_id.clone().unwrap_or_default(); let fetch_opts = WorkflowFetchHistoryOptions::builder() @@ -524,8 +499,8 @@ where .build(); loop { - let history = self.fetch_history_for_run(&run_id, &fetch_opts).await?; - let mut events = history.into_events(); + let history = self.fetch_history_for_run(&run_id, fetch_opts.clone()); + let mut events = history.into_events().await?; if events.is_empty() { continue; @@ -892,96 +867,37 @@ where .await .map_err(WorkflowInteractionError::from) } - /// Fetch workflow execution history. - pub async fn fetch_history( - &self, - opts: WorkflowFetchHistoryOptions, - ) -> Result - where - CT: NamespacedClient, - { - let run_id = self.info.run_id.clone().unwrap_or_default(); - self.fetch_history_for_run(&run_id, &opts).await - } /// Fetch workflow execution history as a lazy stream - pub fn fetch_history_stream( + pub fn fetch_history( &self, opts: WorkflowFetchHistoryOptions, - ) -> WorkflowHistoryStream + ) -> WorkflowHistory where - CT: WorkflowService + NamespacedClient + 'static, + CT: NamespacedClient + 'static, { let run_id = self.info.run_id.clone().unwrap_or_default(); - self.fetch_history_stream_for_run(&run_id, &opts) + self.fetch_history_for_run(&run_id, opts) } - /// Fetch history for a specific run_id, handling pagination. - async fn fetch_history_for_run( + fn fetch_history_for_run( &self, run_id: &str, - opts: &WorkflowFetchHistoryOptions, - ) -> Result - where - CT: NamespacedClient, - { - let mut all_events = Vec::new(); - let mut next_page_token = vec![]; - - loop { - let response = WorkflowService::get_workflow_execution_history( - &mut self.client.clone(), - GetWorkflowExecutionHistoryRequest { - namespace: self.client.namespace(), - execution: Some(ProtoWorkflowExecution { - workflow_id: self.info.workflow_id.clone(), - run_id: run_id.to_string(), - }), - next_page_token: next_page_token.clone(), - skip_archival: opts.skip_archival, - wait_new_event: opts.wait_new_event, - history_event_filter_type: opts.event_filter_type as i32, - ..Default::default() - } - .into_request(), - ) - .await - .map_err(WorkflowInteractionError::from_status)? - .into_inner(); - - if let Some(history) = response.history { - all_events.extend(history.events); - } - - if response.next_page_token.is_empty() { - break; - } - next_page_token = response.next_page_token; - } - - Ok(WorkflowHistory::new(all_events)) - } - - /// Fetch history for a specific run_id, handling pagination as a lazy stream - fn fetch_history_stream_for_run( - &self, - run_id: &str, - opts: &WorkflowFetchHistoryOptions, - ) -> WorkflowHistoryStream + opts: WorkflowFetchHistoryOptions, + ) -> WorkflowHistory where - CT: WorkflowService + NamespacedClient + 'static, + CT: NamespacedClient + 'static, { let client = self.client.clone(); let workflow_id = self.info.workflow_id.clone(); let run_id = run_id.to_string(); - let opts = opts.clone(); // State: (next_page_token, buffer, yielded_count, exhausted) - let initial_state = (Vec::new(), VecDeque::new(), 0, false); + let initial_state = (Vec::new(), VecDeque::new(), false); let stream = stream::unfold( initial_state, - move |(next_page_token, mut buffer, mut yielded, exhausted)| { + move |(next_page_token, mut buffer, exhausted)| { let mut client = client.clone(); let namespace = client.namespace(); let workflow_id = workflow_id.clone(); @@ -990,8 +906,7 @@ where async move { if let Some(exec) = buffer.pop_front() { - yielded += 1; - return Some((Ok(exec), (next_page_token, buffer, yielded, exhausted))); + return Some((Ok(exec), (next_page_token, buffer, exhausted))); } if exhausted { @@ -1027,20 +942,20 @@ where } if let Some(event) = buffer.pop_front() { - Some((Ok(event), (new_token, buffer, yielded, new_exhausted))) + Some((Ok(event), (new_token, buffer, new_exhausted))) } else { None } } Err(e) => Some(( Err(WorkflowInteractionError::from_status(e)), - (next_page_token, buffer, yielded, true), + (next_page_token, buffer, true), )), } } }, ); - WorkflowHistoryStream::new(Box::pin(stream)) + WorkflowHistory::new(Box::pin(stream)) } } From a55e7add54222248d83c1d918734529b2ceab423 Mon Sep 17 00:00:00 2001 From: emiling Date: Mon, 27 Apr 2026 23:45:54 +0900 Subject: [PATCH 6/6] temporalio/sdk-core#1213 make WorkflowHistory stream-based and retain into_events helper --- crates/client/src/workflow_handle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index 55940828c..960167cc3 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -892,7 +892,7 @@ where let workflow_id = self.info.workflow_id.clone(); let run_id = run_id.to_string(); - // State: (next_page_token, buffer, yielded_count, exhausted) + // State: (next_page_token, buffer, exhausted) let initial_state = (Vec::new(), VecDeque::new(), false); let stream = stream::unfold(