11import { query } from "@anthropic-ai/claude-agent-sdk" ;
2- import { logger , metadata , schemaTask } from "@trigger.dev/sdk" ;
2+ import { metadata , schemaTask } from "@trigger.dev/sdk" ;
33import { z } from "zod" ;
44import { mkdtemp , rm } from "fs/promises" ;
55import { tmpdir } from "os" ;
@@ -112,6 +112,7 @@ Important: Only include your final analysis in your response. Do not include phr
112112 maxTurns : 10 ,
113113 permissionMode : "acceptEdits" ,
114114 abortController,
115+ includePartialMessages : true , // Enable incremental text streaming
115116 allowedTools : [
116117 "Bash" ,
117118 "Glob" ,
@@ -125,39 +126,31 @@ Important: Only include your final analysis in your response. Do not include phr
125126 metadata . set ( "status" , "Streaming response..." ) ;
126127 metadata . set ( "progress" , 90 ) ;
127128
128- // Create an async generator that yields text strings from the messages
129- const extractText = async function * ( ) {
130- for await ( const message of result ) {
131- logger . debug ( "Message type" , { type : message . type } ) ;
132-
133- // Extract text from assistant messages
134- if ( message . type === "assistant" && message . message ?. content ) {
135- for ( const block of message . message . content ) {
136- if ( block . type === "text" ) {
137- const text = block . text ;
138- logger . debug ( "Streaming text block" , {
139- length : text . length ,
140- preview : text . slice ( 0 , 100 ) ,
141- } ) ;
142-
143- // Split large text blocks into smaller chunks for better streaming
144- // This helps when Claude sends a large response in a single message
145- const chunkSize = 20 ; // Send 20 chars at a time
146- if ( text . length > chunkSize ) {
147- for ( let i = 0 ; i < text . length ; i += chunkSize ) {
148- yield text . slice ( i , i + chunkSize ) ;
149- }
150- } else {
151- yield text ;
129+ // Stream text using writer API
130+ const { waitUntilComplete } = agentStream . writer ( {
131+ execute : async ( { write } ) => {
132+ for await ( const message of result ) {
133+ // Handle incremental text deltas from stream events
134+ if ( message . type === "stream_event" ) {
135+ const event = message . event ;
136+ if (
137+ event . type === "content_block_delta" &&
138+ event . delta . type === "text_delta"
139+ ) {
140+ write ( event . delta . text ) ;
141+ }
142+ } // Fallback: handle complete assistant messages
143+ else if ( message . type === "assistant" && message . message ?. content ) {
144+ for ( const block of message . message . content ) {
145+ if ( block . type === "text" ) {
146+ write ( block . text ) ;
152147 }
153148 }
154149 }
155150 }
156- }
157- } ;
151+ } ,
152+ } ) ;
158153
159- // Use pipe to stream the text - this properly handles backpressure
160- const { waitUntilComplete } = agentStream . pipe ( extractText ( ) ) ;
161154 await waitUntilComplete ( ) ;
162155
163156 metadata . set ( "status" , "Completed" ) ;
0 commit comments