Skip to content

Commit b1caeb0

Browse files
committed
cleanup subagent + streaming issues
1 parent a738a6d commit b1caeb0

23 files changed

Lines changed: 361 additions & 111 deletions

File tree

apps/sim/app/api/copilot/chat/route.ts

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
processContextsServer,
1616
resolveActiveResourceContext,
1717
} from '@/lib/copilot/chat/process-contents'
18+
import { finalizeAssistantTurn } from '@/lib/copilot/chat/terminal-state'
1819
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/constants'
1920
import {
2021
createBadRequestResponse,
@@ -378,6 +379,7 @@ export async function POST(req: NextRequest) {
378379
autoExecuteTools: true,
379380
interactive: true,
380381
onComplete: buildOnComplete(actualChatId, userMessageIdToUse, tracker.requestId),
382+
onError: buildOnError(actualChatId, userMessageIdToUse, tracker.requestId),
381383
},
382384
})
383385

@@ -419,34 +421,16 @@ function buildOnComplete(
419421
requestId: string
420422
): (result: OrchestratorResult) => Promise<void> {
421423
return async (result) => {
422-
if (!chatId || !result.success) return
423-
424-
const assistantMessage = buildPersistedAssistantMessage(result, result.requestId)
424+
if (!chatId) return
425425

426426
try {
427-
const [row] = await db
428-
.select({ messages: copilotChats.messages })
429-
.from(copilotChats)
430-
.where(eq(copilotChats.id, chatId))
431-
.limit(1)
432-
433-
const msgs: Record<string, unknown>[] = Array.isArray(row?.messages) ? row.messages : []
434-
const userIdx = msgs.findIndex((m: Record<string, unknown>) => m.id === userMessageId)
435-
const alreadyHasResponse =
436-
userIdx >= 0 &&
437-
userIdx + 1 < msgs.length &&
438-
(msgs[userIdx + 1] as Record<string, unknown>)?.role === 'assistant'
439-
440-
if (!alreadyHasResponse) {
441-
await db
442-
.update(copilotChats)
443-
.set({
444-
messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`,
445-
conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageId} THEN NULL ELSE ${copilotChats.conversationId} END`,
446-
updatedAt: new Date(),
447-
})
448-
.where(eq(copilotChats.id, chatId))
449-
}
427+
await finalizeAssistantTurn({
428+
chatId,
429+
userMessageId,
430+
...(result.success
431+
? { assistantMessage: buildPersistedAssistantMessage(result, result.requestId) }
432+
: {}),
433+
})
450434
} catch (error) {
451435
logger.error(`[${requestId}] Failed to persist chat messages`, {
452436
chatId,
@@ -456,6 +440,25 @@ function buildOnComplete(
456440
}
457441
}
458442

443+
function buildOnError(
444+
chatId: string | undefined,
445+
userMessageId: string,
446+
requestId: string
447+
): () => Promise<void> {
448+
return async () => {
449+
if (!chatId) return
450+
451+
try {
452+
await finalizeAssistantTurn({ chatId, userMessageId })
453+
} catch (error) {
454+
logger.error(`[${requestId}] Failed to finalize errored chat stream`, {
455+
chatId,
456+
error: error instanceof Error ? error.message : 'Unknown error',
457+
})
458+
}
459+
}
460+
}
461+
459462
// ---------------------------------------------------------------------------
460463
// GET handler (read-only queries, extracted to queries.ts)
461464
// ---------------------------------------------------------------------------

apps/sim/app/api/mothership/chat/route.ts

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
processContextsServer,
1616
resolveActiveResourceContext,
1717
} from '@/lib/copilot/chat/process-contents'
18+
import { finalizeAssistantTurn } from '@/lib/copilot/chat/terminal-state'
1819
import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context'
1920
import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request/http'
2021
import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/request/lifecycle/start'
@@ -295,47 +296,46 @@ export async function POST(req: NextRequest) {
295296
interactive: true,
296297
onComplete: async (result: OrchestratorResult) => {
297298
if (!actualChatId) return
298-
if (!result.success) return
299-
300-
const assistantMessage = buildPersistedAssistantMessage(result, result.requestId)
301299

302300
try {
303-
const [row] = await db
304-
.select({ messages: copilotChats.messages })
305-
.from(copilotChats)
306-
.where(eq(copilotChats.id, actualChatId))
307-
.limit(1)
308-
309-
const msgs: any[] = Array.isArray(row?.messages) ? row.messages : []
310-
const userIdx = msgs.findIndex((m: any) => m.id === userMessageId)
311-
const alreadyHasResponse =
312-
userIdx >= 0 &&
313-
userIdx + 1 < msgs.length &&
314-
(msgs[userIdx + 1] as any)?.role === 'assistant'
315-
316-
if (!alreadyHasResponse) {
317-
await db
318-
.update(copilotChats)
319-
.set({
320-
messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`,
321-
conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageId} THEN NULL ELSE ${copilotChats.conversationId} END`,
322-
updatedAt: new Date(),
323-
})
324-
.where(eq(copilotChats.id, actualChatId))
325-
326-
taskPubSub?.publishStatusChanged({
327-
workspaceId,
328-
chatId: actualChatId,
329-
type: 'completed',
330-
})
331-
}
301+
await finalizeAssistantTurn({
302+
chatId: actualChatId,
303+
userMessageId,
304+
...(result.success
305+
? { assistantMessage: buildPersistedAssistantMessage(result, result.requestId) }
306+
: {}),
307+
})
308+
taskPubSub?.publishStatusChanged({
309+
workspaceId,
310+
chatId: actualChatId,
311+
type: 'completed',
312+
})
332313
} catch (error) {
333314
logger.error(`[${tracker.requestId}] Failed to persist chat messages`, {
334315
chatId: actualChatId,
335316
error: error instanceof Error ? error.message : 'Unknown error',
336317
})
337318
}
338319
},
320+
onError: async () => {
321+
if (!actualChatId) return
322+
try {
323+
await finalizeAssistantTurn({
324+
chatId: actualChatId,
325+
userMessageId,
326+
})
327+
taskPubSub?.publishStatusChanged({
328+
workspaceId,
329+
chatId: actualChatId,
330+
type: 'completed',
331+
})
332+
} catch (error) {
333+
logger.error(`[${tracker.requestId}] Failed to finalize errored chat stream`, {
334+
chatId: actualChatId,
335+
error: error instanceof Error ? error.message : 'Unknown error',
336+
})
337+
}
338+
},
339339
},
340340
})
341341

apps/sim/app/api/mothership/chat/stop/route.ts

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,38 @@ export async function POST(req: NextRequest) {
7070

7171
const { chatId, streamId, content, contentBlocks } = StopSchema.parse(await req.json())
7272

73-
await releasePendingChatStream(chatId, streamId)
73+
const [row] = await db
74+
.select({
75+
workspaceId: copilotChats.workspaceId,
76+
messages: copilotChats.messages,
77+
})
78+
.from(copilotChats)
79+
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, session.user.id)))
80+
.limit(1)
81+
82+
if (!row) {
83+
await releasePendingChatStream(chatId, streamId)
84+
return NextResponse.json({ success: true })
85+
}
86+
87+
const messages: Record<string, unknown>[] = Array.isArray(row.messages) ? row.messages : []
88+
const userIdx = messages.findIndex((message) => message.id === streamId)
89+
const alreadyHasResponse =
90+
userIdx >= 0 &&
91+
userIdx + 1 < messages.length &&
92+
(messages[userIdx + 1] as Record<string, unknown>)?.role === 'assistant'
93+
const canAppendAssistant =
94+
userIdx >= 0 && userIdx === messages.length - 1 && !alreadyHasResponse
7495

7596
const setClause: Record<string, unknown> = {
76-
conversationId: null,
97+
conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${streamId} THEN NULL ELSE ${copilotChats.conversationId} END`,
7798
updatedAt: new Date(),
7899
}
79100

80101
const hasContent = content.trim().length > 0
81102
const hasBlocks = Array.isArray(contentBlocks) && contentBlocks.length > 0
82103

83-
if (hasContent || hasBlocks) {
104+
if ((hasContent || hasBlocks) && canAppendAssistant) {
84105
const normalized = normalizeMessage({
85106
id: crypto.randomUUID(),
86107
role: 'assistant',
@@ -95,15 +116,11 @@ export async function POST(req: NextRequest) {
95116
const [updated] = await db
96117
.update(copilotChats)
97118
.set(setClause)
98-
.where(
99-
and(
100-
eq(copilotChats.id, chatId),
101-
eq(copilotChats.userId, session.user.id),
102-
eq(copilotChats.conversationId, streamId)
103-
)
104-
)
119+
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, session.user.id)))
105120
.returning({ workspaceId: copilotChats.workspaceId })
106121

122+
await releasePendingChatStream(chatId, streamId)
123+
107124
if (updated?.workspaceId) {
108125
taskPubSub?.publishStatusChanged({
109126
workspaceId: updated.workspaceId,

apps/sim/app/workspace/[workspaceId]/home/components/message-content/components/chat-content/chat-content.tsx

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,16 @@ interface ChatContentProps {
172172
content: string
173173
isStreaming?: boolean
174174
onOptionSelect?: (id: string) => void
175+
smoothStreaming?: boolean
175176
}
176177

177-
export function ChatContent({ content, isStreaming = false, onOptionSelect }: ChatContentProps) {
178-
const rendered = useStreamingText(content, isStreaming)
178+
export function ChatContent({
179+
content,
180+
isStreaming = false,
181+
onOptionSelect,
182+
smoothStreaming = true,
183+
}: ChatContentProps) {
184+
const rendered = useStreamingText(content, isStreaming && smoothStreaming)
179185

180186
const parsed = useMemo(() => parseSpecialTags(rendered, isStreaming), [rendered, isStreaming])
181187
const hasSpecialContent = parsed.hasPendingTag || parsed.segments.some((s) => s.type !== 'text')

apps/sim/app/workspace/[workspaceId]/home/components/message-content/message-content.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ export function MessageContent({
372372
const hasSubagentEnded = blocks.some((b) => b.type === 'subagent_end')
373373
const showTrailingThinking =
374374
isStreaming && !hasTrailingContent && (hasSubagentEnded || allLastGroupToolsDone)
375+
const hasStructuredSegments = segments.some((segment) => segment.type !== 'text')
375376
const lastOpenSubagentGroupId = [...segments]
376377
.reverse()
377378
.find(
@@ -390,6 +391,7 @@ export function MessageContent({
390391
content={segment.content}
391392
isStreaming={isStreaming}
392393
onOptionSelect={onOptionSelect}
394+
smoothStreaming={!hasStructuredSegments}
393395
/>
394396
)
395397
case 'agent_group': {

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,14 +1091,14 @@ export function useChat(
10911091
break
10921092
}
10931093
const tc = blocks[idx].toolCall!
1094-
const resultObj = asPayloadRecord(payload.result)
1094+
const outputObj = asPayloadRecord(payload.output)
10951095
const success =
10961096
typeof payload.success === 'boolean'
10971097
? payload.success
10981098
: payload.status === MothershipStreamV1ToolOutcome.success
10991099
const isCancelled =
1100-
resultObj?.reason === 'user_cancelled' ||
1101-
resultObj?.cancelledByUser === true ||
1100+
outputObj?.reason === 'user_cancelled' ||
1101+
outputObj?.cancelledByUser === true ||
11021102
payload.reason === 'user_cancelled' ||
11031103
payload.cancelledByUser === true ||
11041104
payload.status === MothershipStreamV1ToolOutcome.cancelled
@@ -1112,12 +1112,7 @@ export function useChat(
11121112
tc.streamingArgs = undefined
11131113
tc.result = {
11141114
success: !!success,
1115-
output:
1116-
payload.result !== undefined
1117-
? payload.result
1118-
: payload.output !== undefined
1119-
? payload.output
1120-
: payload.data,
1115+
output: payload.output,
11211116
error: typeof payload.error === 'string' ? payload.error : undefined,
11221117
}
11231118
flush()

apps/sim/app/workspace/[workspaceId]/home/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ export const ContentBlockType = {
135135
subagent: 'subagent',
136136
subagent_end: 'subagent_end',
137137
subagent_text: 'subagent_text',
138+
subagent_thinking: 'subagent_thinking',
138139
options: 'options',
139140
stopped: 'stopped',
140141
} as const

apps/sim/lib/copilot/chat/display-message.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ function toDisplayBlock(block: PersistedContentBlock): ContentBlock {
4646
switch (block.type) {
4747
case MothershipStreamV1EventType.text:
4848
if (block.lane === 'subagent') {
49+
if (block.channel === 'thinking') {
50+
return { type: ContentBlockType.subagent_thinking, content: block.content }
51+
}
4952
return { type: ContentBlockType.subagent_text, content: block.content }
5053
}
5154
return { type: ContentBlockType.text, content: block.content }

apps/sim/lib/copilot/chat/persisted-message.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,13 @@ function mapContentBlock(block: ContentBlock): PersistedContentBlock {
113113
channel: MothershipStreamV1TextChannel.assistant,
114114
content: block.content,
115115
}
116+
case 'subagent_thinking':
117+
return {
118+
type: MothershipStreamV1EventType.text,
119+
lane: 'subagent',
120+
channel: MothershipStreamV1TextChannel.thinking,
121+
content: block.content,
122+
}
116123
case 'tool_call': {
117124
if (!block.toolCall) {
118125
return {
@@ -274,7 +281,7 @@ function normalizeCanonicalBlock(block: RawBlock): PersistedContentBlock {
274281
const result: PersistedContentBlock = {
275282
type: block.type as MothershipStreamV1EventType,
276283
}
277-
if (block.lane === 'main' || block.lane === 'subagent') {
284+
if (block.lane === 'subagent') {
278285
result.lane = block.lane
279286
}
280287
const blockContent = block.content ?? block.text
@@ -349,6 +356,15 @@ function normalizeLegacyBlock(block: RawBlock): PersistedContentBlock {
349356
}
350357
}
351358

359+
if (block.type === 'subagent_thinking') {
360+
return {
361+
type: MothershipStreamV1EventType.text,
362+
lane: 'subagent',
363+
channel: MothershipStreamV1TextChannel.thinking,
364+
content: block.content,
365+
}
366+
}
367+
352368
if (block.type === 'subagent_end') {
353369
return {
354370
type: MothershipStreamV1EventType.span,

0 commit comments

Comments
 (0)