diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index 976dd07e7..960167cc3 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -8,7 +8,14 @@ use crate::{ }, grpc::WorkflowService, }; -use std::{fmt::Debug, marker::PhantomData}; +use futures_util::{TryStreamExt, stream, stream::Stream}; +use std::{ + collections::VecDeque, + fmt::Debug, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; pub use temporalio_common::UntypedWorkflow; use temporalio_common::{ HasWorkflowDefinition, QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition, @@ -24,10 +31,7 @@ use temporalio_common::{ common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution}, enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage}, failure::v1::Failure, - history::{ - self, - v1::{HistoryEvent, history_event::Attributes}, - }, + history::v1::{HistoryEvent, history_event::Attributes}, query::v1::WorkflowQuery, sdk::v1::UserMetadata, update::{self, v1::WaitPolicy}, @@ -275,31 +279,29 @@ impl WorkflowExecutionDescription { } } -// TODO [rust-sdk-branch]: Could implment stream a-la ListWorkflowsStream -/// Workflow execution history returned by `WorkflowHandle::fetch_history`. -#[derive(Debug, Clone)] +/// Workflow execution history, lazily fetched as a stream from the server. +/// Use `into_events` to collect all events at once, or consume as a `stream`. pub struct WorkflowHistory { - events: Vec, -} -impl From for history::v1::History { - fn from(h: WorkflowHistory) -> Self { - Self { events: h.events } - } + inner: Pin> + Send>>, } impl WorkflowHistory { - fn new(events: Vec) -> Self { - Self { events } + fn new( + inner: Pin> + Send>>, + ) -> Self { + Self { inner } } - /// The history events. - pub fn events(&self) -> &[HistoryEvent] { - &self.events + /// Consume the stream and collect all events into a Vec + pub async fn into_events(self) -> Result, WorkflowInteractionError> { + self.inner.try_collect().await } +} - /// Consume the history and return the events. - pub fn into_events(self) -> Vec { - self.events +impl Stream for WorkflowHistory { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.as_mut().poll_next(cx) } } @@ -462,7 +464,7 @@ where opts: WorkflowGetResultOptions, ) -> Result where - CT: WorkflowService + NamespacedClient + Clone, + CT: WorkflowService + NamespacedClient + Clone + 'static, { let raw = self.get_result_raw(opts).await?; match raw { @@ -487,7 +489,7 @@ where opts: WorkflowGetResultOptions, ) -> Result, WorkflowInteractionError> where - CT: WorkflowService + NamespacedClient + Clone, + CT: WorkflowService + NamespacedClient + Clone + 'static, { let mut run_id = self.info.run_id.clone().unwrap_or_default(); let fetch_opts = WorkflowFetchHistoryOptions::builder() @@ -497,8 +499,8 @@ where .build(); loop { - let history = self.fetch_history_for_run(&run_id, &fetch_opts).await?; - let mut events = history.into_events(); + let history = self.fetch_history_for_run(&run_id, fetch_opts.clone()); + let mut events = history.into_events().await?; if events.is_empty() { continue; @@ -865,62 +867,95 @@ where .await .map_err(WorkflowInteractionError::from) } - /// Fetch workflow execution history. - pub async fn fetch_history( + + /// Fetch workflow execution history as a lazy stream + pub fn fetch_history( &self, opts: WorkflowFetchHistoryOptions, - ) -> Result + ) -> WorkflowHistory where - CT: NamespacedClient, + CT: NamespacedClient + 'static, { let run_id = self.info.run_id.clone().unwrap_or_default(); - self.fetch_history_for_run(&run_id, &opts).await + self.fetch_history_for_run(&run_id, opts) } - /// Fetch history for a specific run_id, handling pagination. - async fn fetch_history_for_run( + fn fetch_history_for_run( &self, run_id: &str, - opts: &WorkflowFetchHistoryOptions, - ) -> Result + opts: WorkflowFetchHistoryOptions, + ) -> WorkflowHistory where - CT: NamespacedClient, + CT: NamespacedClient + 'static, { - let mut all_events = Vec::new(); - let mut next_page_token = vec![]; - - loop { - let response = WorkflowService::get_workflow_execution_history( - &mut self.client.clone(), - GetWorkflowExecutionHistoryRequest { - namespace: self.client.namespace(), - execution: Some(ProtoWorkflowExecution { - workflow_id: self.info.workflow_id.clone(), - run_id: run_id.to_string(), - }), - next_page_token: next_page_token.clone(), - skip_archival: opts.skip_archival, - wait_new_event: opts.wait_new_event, - history_event_filter_type: opts.event_filter_type as i32, - ..Default::default() - } - .into_request(), - ) - .await - .map_err(WorkflowInteractionError::from_status)? - .into_inner(); - - if let Some(history) = response.history { - all_events.extend(history.events); - } + let client = self.client.clone(); + let workflow_id = self.info.workflow_id.clone(); + let run_id = run_id.to_string(); + + // State: (next_page_token, buffer, exhausted) + let initial_state = (Vec::new(), VecDeque::new(), false); + + let stream = stream::unfold( + initial_state, + move |(next_page_token, mut buffer, exhausted)| { + let mut client = client.clone(); + let namespace = client.namespace(); + let workflow_id = workflow_id.clone(); + let run_id = run_id.clone(); + let opts = opts.clone(); + + async move { + if let Some(exec) = buffer.pop_front() { + return Some((Ok(exec), (next_page_token, buffer, exhausted))); + } - if response.next_page_token.is_empty() { - break; - } - next_page_token = response.next_page_token; - } + if exhausted { + return None; + } - Ok(WorkflowHistory::new(all_events)) + let response = WorkflowService::get_workflow_execution_history( + &mut client, + GetWorkflowExecutionHistoryRequest { + namespace, + execution: Some(ProtoWorkflowExecution { + workflow_id, + run_id, + }), + next_page_token: next_page_token.clone(), + skip_archival: opts.skip_archival, + wait_new_event: opts.wait_new_event, + history_event_filter_type: opts.event_filter_type as i32, + ..Default::default() + } + .into_request(), + ) + .await; + + match response { + Ok(resp) => { + let resp = resp.into_inner(); + let new_exhausted = resp.next_page_token.is_empty(); + let new_token = resp.next_page_token; + + if let Some(history) = resp.history { + buffer = history.events.into_iter().collect(); + } + + if let Some(event) = buffer.pop_front() { + Some((Ok(event), (new_token, buffer, new_exhausted))) + } else { + None + } + } + Err(e) => Some(( + Err(WorkflowInteractionError::from_status(e)), + (next_page_token, buffer, true), + )), + } + } + }, + ); + WorkflowHistory::new(Box::pin(stream)) } }