Skip to content
173 changes: 104 additions & 69 deletions crates/client/src/workflow_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ use crate::{
},
grpc::WorkflowService,
};
use std::{fmt::Debug, marker::PhantomData};
use futures_util::{TryStreamExt, stream, stream::Stream};
use std::{
collections::VecDeque,
fmt::Debug,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
pub use temporalio_common::UntypedWorkflow;
use temporalio_common::{
HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
Expand All @@ -24,10 +31,7 @@ use temporalio_common::{
common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution},
enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage},
failure::v1::Failure,
history::{
self,
v1::{HistoryEvent, history_event::Attributes},
},
history::v1::{HistoryEvent, history_event::Attributes},
query::v1::WorkflowQuery,
sdk::v1::UserMetadata,
update::{self, v1::WaitPolicy},
Expand Down Expand Up @@ -275,31 +279,29 @@ impl WorkflowExecutionDescription {
}
}

// TODO [rust-sdk-branch]: Could implment stream a-la ListWorkflowsStream
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think rather than having two totally separate APIs, we can just change WorkflowHistory to become like your new stream-based type. Since we're still in pre-release we can change the public API.

We can keep into_events as a helper for people who would rather consume it all at once.

Copy link
Copy Markdown
Author

@emiling emiling Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sushisource Originally, I separated this out to maintain backward compatibility. Since we're still in pre-release and public api changes are fine, I’ve updated it to use the stream-based WorkflowHistory as suggested. Thanks for the suggestion!

/// Workflow execution history returned by `WorkflowHandle::fetch_history`.
#[derive(Debug, Clone)]
/// Workflow execution history, lazily fetched as a stream from the server.
/// Use `into_events` to collect all events at once, or consume as a `stream`.
pub struct WorkflowHistory {
events: Vec<HistoryEvent>,
}
impl From<WorkflowHistory> for history::v1::History {
fn from(h: WorkflowHistory) -> Self {
Self { events: h.events }
}
inner: Pin<Box<dyn Stream<Item = Result<HistoryEvent, WorkflowInteractionError>> + Send>>,
}

impl WorkflowHistory {
fn new(events: Vec<HistoryEvent>) -> Self {
Self { events }
fn new(
inner: Pin<Box<dyn Stream<Item = Result<HistoryEvent, WorkflowInteractionError>> + Send>>,
) -> Self {
Self { inner }
}

/// The history events.
pub fn events(&self) -> &[HistoryEvent] {
&self.events
/// Consume the stream and collect all events into a Vec
pub async fn into_events(self) -> Result<Vec<HistoryEvent>, WorkflowInteractionError> {
self.inner.try_collect().await
}
}

/// Consume the history and return the events.
pub fn into_events(self) -> Vec<HistoryEvent> {
self.events
impl Stream for WorkflowHistory {
type Item = Result<HistoryEvent, WorkflowInteractionError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}

Expand Down Expand Up @@ -462,7 +464,7 @@ where
opts: WorkflowGetResultOptions,
) -> Result<W::Output, WorkflowGetResultError>
where
CT: WorkflowService + NamespacedClient + Clone,
CT: WorkflowService + NamespacedClient + Clone + 'static,
{
let raw = self.get_result_raw(opts).await?;
match raw {
Expand All @@ -487,7 +489,7 @@ where
opts: WorkflowGetResultOptions,
) -> Result<WorkflowExecutionResult<W::Output>, WorkflowInteractionError>
where
CT: WorkflowService + NamespacedClient + Clone,
CT: WorkflowService + NamespacedClient + Clone + 'static,
{
let mut run_id = self.info.run_id.clone().unwrap_or_default();
let fetch_opts = WorkflowFetchHistoryOptions::builder()
Expand All @@ -497,8 +499,8 @@ where
.build();

loop {
let history = self.fetch_history_for_run(&run_id, &fetch_opts).await?;
let mut events = history.into_events();
let history = self.fetch_history_for_run(&run_id, fetch_opts.clone());
let mut events = history.into_events().await?;

if events.is_empty() {
continue;
Expand Down Expand Up @@ -865,62 +867,95 @@ where
.await
.map_err(WorkflowInteractionError::from)
}
/// Fetch workflow execution history.
pub async fn fetch_history(

/// Fetch workflow execution history as a lazy stream
pub fn fetch_history(
&self,
opts: WorkflowFetchHistoryOptions,
) -> Result<WorkflowHistory, WorkflowInteractionError>
) -> WorkflowHistory
where
CT: NamespacedClient,
CT: NamespacedClient + 'static,
{
let run_id = self.info.run_id.clone().unwrap_or_default();
self.fetch_history_for_run(&run_id, &opts).await
self.fetch_history_for_run(&run_id, opts)
}

/// Fetch history for a specific run_id, handling pagination.
async fn fetch_history_for_run(
fn fetch_history_for_run(
&self,
run_id: &str,
opts: &WorkflowFetchHistoryOptions,
) -> Result<WorkflowHistory, WorkflowInteractionError>
opts: WorkflowFetchHistoryOptions,
) -> WorkflowHistory
where
CT: NamespacedClient,
CT: NamespacedClient + 'static,
{
let mut all_events = Vec::new();
let mut next_page_token = vec![];

loop {
let response = WorkflowService::get_workflow_execution_history(
&mut self.client.clone(),
GetWorkflowExecutionHistoryRequest {
namespace: self.client.namespace(),
execution: Some(ProtoWorkflowExecution {
workflow_id: self.info.workflow_id.clone(),
run_id: run_id.to_string(),
}),
next_page_token: next_page_token.clone(),
skip_archival: opts.skip_archival,
wait_new_event: opts.wait_new_event,
history_event_filter_type: opts.event_filter_type as i32,
..Default::default()
}
.into_request(),
)
.await
.map_err(WorkflowInteractionError::from_status)?
.into_inner();

if let Some(history) = response.history {
all_events.extend(history.events);
}
let client = self.client.clone();
let workflow_id = self.info.workflow_id.clone();
let run_id = run_id.to_string();

// State: (next_page_token, buffer, exhausted)
let initial_state = (Vec::new(), VecDeque::new(), false);

let stream = stream::unfold(
initial_state,
move |(next_page_token, mut buffer, exhausted)| {
let mut client = client.clone();
let namespace = client.namespace();
let workflow_id = workflow_id.clone();
let run_id = run_id.clone();
let opts = opts.clone();

async move {
if let Some(exec) = buffer.pop_front() {
return Some((Ok(exec), (next_page_token, buffer, exhausted)));
}

if response.next_page_token.is_empty() {
break;
}
next_page_token = response.next_page_token;
}
if exhausted {
return None;
}

Ok(WorkflowHistory::new(all_events))
let response = WorkflowService::get_workflow_execution_history(
&mut client,
GetWorkflowExecutionHistoryRequest {
namespace,
execution: Some(ProtoWorkflowExecution {
workflow_id,
run_id,
}),
next_page_token: next_page_token.clone(),
skip_archival: opts.skip_archival,
wait_new_event: opts.wait_new_event,
history_event_filter_type: opts.event_filter_type as i32,
..Default::default()
}
.into_request(),
)
.await;

match response {
Ok(resp) => {
let resp = resp.into_inner();
let new_exhausted = resp.next_page_token.is_empty();
let new_token = resp.next_page_token;

if let Some(history) = resp.history {
buffer = history.events.into_iter().collect();
}

if let Some(event) = buffer.pop_front() {
Some((Ok(event), (new_token, buffer, new_exhausted)))
} else {
None
}
}
Err(e) => Some((
Err(WorkflowInteractionError::from_status(e)),
(next_page_token, buffer, true),
)),
}
}
},
);
WorkflowHistory::new(Box::pin(stream))
}
}

Expand Down
Loading