Skip to content

Commit f90a8bf

Browse files
committed
support one-shot streams w/the sse extension
1 parent 6318885 commit f90a8bf

6 files changed

Lines changed: 267 additions & 34 deletions

File tree

src/core/runtime/conversions.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ export const conversions = {
149149
return Array.from(val).flat();
150150
},
151151
HTML: _toHTML,
152+
Stream: function () {
153+
throw new Error("The Stream conversion requires the SSE extension. " +
154+
"Include dist/ext/sse.js or dist/ext/sse.esm.js after hyperscript.");
155+
},
152156
Fragment: function (val, runtime) {
153157
var frag = document.createDocumentFragment();
154158
runtime.implicitLoop(val, (val) => {

src/ext/eventsource.js

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,12 +362,85 @@ function decode(data, encoding) {
362362
return data;
363363
}
364364

365+
// ========================================
366+
// STREAM CONVERSION (fetch ... as Stream)
367+
// ========================================
368+
369+
function createStream(response, runtime, context) {
370+
var element = context.me;
371+
var reader = response.body.getReader();
372+
var messages = [];
373+
var waiting = null;
374+
var done = false;
375+
376+
// Read SSE events in the background, dispatch named events on the
377+
// element, and buffer unnamed messages for async iteration.
378+
(async function () {
379+
try {
380+
for await (var msg of parseSSE(reader)) {
381+
var eventType = msg.event || 'message';
382+
if (msg.event) {
383+
runtime.triggerEvent(element, eventType, {
384+
data: msg.data,
385+
lastEventId: msg.id || ''
386+
});
387+
} else {
388+
messages.push(msg.data);
389+
if (waiting) {
390+
waiting.resolve({ value: msg.data, done: false });
391+
waiting = null;
392+
}
393+
}
394+
}
395+
} catch (err) {
396+
runtime.triggerEvent(element, 'stream-error', { error: err });
397+
}
398+
done = true;
399+
if (waiting) {
400+
waiting.resolve({ value: undefined, done: true });
401+
waiting = null;
402+
}
403+
runtime.triggerEvent(element, 'streamEnd', {});
404+
})();
405+
406+
var stream = {
407+
element: element,
408+
[Symbol.asyncIterator]: function () {
409+
var index = 0;
410+
return {
411+
next: function () {
412+
if (index < messages.length) {
413+
return Promise.resolve({ value: messages[index++], done: false });
414+
}
415+
if (done) {
416+
return Promise.resolve({ value: undefined, done: true });
417+
}
418+
return new Promise(function (resolve) {
419+
waiting = { resolve: resolve };
420+
}).then(function (result) {
421+
if (!result.done) index++;
422+
return result;
423+
});
424+
}
425+
};
426+
}
427+
};
428+
429+
return stream;
430+
}
431+
432+
var streamConversion = function (response, runtime, context) {
433+
return createStream(response, runtime, context);
434+
};
435+
streamConversion._rawResponse = true;
436+
365437
// ========================================
366438
// PLUGIN REGISTRATION
367439
// ========================================
368440

369441
export default function eventsourcePlugin(_hyperscript) {
370442
_hyperscript.addFeature(EventSourceFeature.keyword, EventSourceFeature.parse.bind(EventSourceFeature));
443+
_hyperscript.config.conversions.Stream = streamConversion;
371444
}
372445

373446
if (typeof self !== 'undefined' && self._hyperscript) {

src/parsetree/commands/basic.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,12 @@ export class FetchCommand extends Command {
654654

655655
if (this.conversionType === "response") return complete(resp);
656656
if (this.conversionType === "json") return resp.json().then(complete);
657+
if (this.conversion) {
658+
var convFn = config.conversions[this.conversion];
659+
if (convFn && convFn._rawResponse) {
660+
return complete(convFn(resp, context.meta.runtime, context));
661+
}
662+
}
657663
return resp.text().then((result) => {
658664
if (this.conversion) result = context.meta.runtime.convertValue(result, this.conversion);
659665
if (this.conversionType === "html") result = context.meta.runtime.convertValue(result, "Fragment");

src/parsetree/commands/controlflow.js

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -68,49 +68,64 @@ class RepeatLoopCommand extends Command {
6868
} else if (times) {
6969
keepLooping = iteratorInfo.index < times;
7070
} else if (iteratorInfo.iterator) {
71+
if (iteratorInfo.async) {
72+
var self = this;
73+
return iteratorInfo.iterator.next().then(function(result) {
74+
if (result.done) {
75+
return self._endLoop(context, iteratorInfo);
76+
}
77+
return self._continueLoop(context, iteratorInfo, result.value);
78+
});
79+
}
7180
var nextValFromIterator = iteratorInfo.iterator.next();
7281
keepLooping = !nextValFromIterator.done;
7382
loopVal = nextValFromIterator.value;
7483
}
7584

7685
if (keepLooping) {
77-
var currentIndex = iteratorInfo.index;
78-
if (iteratorInfo.value) {
79-
context.result = context.locals[this.identifier] = loopVal;
80-
} else {
81-
context.result = currentIndex;
82-
}
83-
if (this.indexIdentifier) {
84-
context.locals[this.indexIdentifier] = currentIndex;
85-
}
86-
87-
// In template mode, emit a scope marker so processNode can
88-
// resolve loop variables on elements with _= attributes
89-
if (context.meta.__ht_template_result && iteratorInfo.value) {
90-
var scopes = context.meta.__ht_scopes || (context.meta.__ht_scopes = {});
91-
if (!scopes[this.slot]) {
92-
scopes[this.slot] = {
93-
identifier: this.identifier,
94-
indexIdentifier: this.indexIdentifier,
95-
source: iteratorInfo.value
96-
};
97-
}
98-
context.meta.__ht_template_result.push(
99-
'<!--hs-scope:' + this.slot + ':' + currentIndex + '-->'
100-
);
101-
}
86+
return this._continueLoop(context, iteratorInfo, loopVal);
87+
} else {
88+
return this._endLoop(context, iteratorInfo);
89+
}
90+
}
10291

103-
iteratorInfo.didIterate = true;
104-
iteratorInfo.index++;
105-
return this.loop;
92+
_continueLoop(context, iteratorInfo, loopVal) {
93+
var currentIndex = iteratorInfo.index;
94+
if (iteratorInfo.value) {
95+
context.result = context.locals[this.identifier] = loopVal;
10696
} else {
107-
var didIterate = iteratorInfo.didIterate;
108-
context.meta.iterators[this.slot] = null;
109-
if (!didIterate && this.elseBranch) {
110-
return this.elseBranch;
97+
context.result = currentIndex;
98+
}
99+
if (this.indexIdentifier) {
100+
context.locals[this.indexIdentifier] = currentIndex;
101+
}
102+
103+
if (context.meta.__ht_template_result && iteratorInfo.value) {
104+
var scopes = context.meta.__ht_scopes || (context.meta.__ht_scopes = {});
105+
if (!scopes[this.slot]) {
106+
scopes[this.slot] = {
107+
identifier: this.identifier,
108+
indexIdentifier: this.indexIdentifier,
109+
source: iteratorInfo.value
110+
};
111111
}
112-
return context.meta.runtime.findNext(this.parent, context);
112+
context.meta.__ht_template_result.push(
113+
'<!--hs-scope:' + this.slot + ':' + currentIndex + '-->'
114+
);
115+
}
116+
117+
iteratorInfo.didIterate = true;
118+
iteratorInfo.index++;
119+
return this.loop;
120+
}
121+
122+
_endLoop(context, iteratorInfo) {
123+
var didIterate = iteratorInfo.didIterate;
124+
context.meta.iterators[this.slot] = null;
125+
if (!didIterate && this.elseBranch) {
126+
return this.elseBranch;
113127
}
128+
return context.meta.runtime.findNext(this.parent, context);
114129
}
115130
}
116131

@@ -325,7 +340,10 @@ export class RepeatCommand extends Command {
325340
context.meta.iterators[this.slot] = iteratorInfo;
326341

327342
if (value) {
328-
if (value[Symbol.iterator]) {
343+
if (value[Symbol.asyncIterator]) {
344+
iteratorInfo.iterator = value[Symbol.asyncIterator]();
345+
iteratorInfo.async = true;
346+
} else if (value[Symbol.iterator]) {
329347
iteratorInfo.iterator = value[Symbol.iterator]();
330348
} else {
331349
iteratorInfo.iterator = Object.keys(value)[Symbol.iterator]();

test/ext/eventsource.js

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,3 +266,60 @@ test.describe('eventsource extension', () => {
266266
await restoreFetch(page);
267267
});
268268
})
269+
270+
test.describe('fetch as Stream', () => {
271+
272+
test('dispatches named SSE events on the element', async ({html, find, page}) => {
273+
await mockSSE(page, '/stream-named', [
274+
{ event: 'status', data: 'loading', delay: 10 },
275+
{ event: 'status', data: 'done', delay: 10 },
276+
]);
277+
await html(`
278+
<div id="out"></div>
279+
<button _="on click
280+
fetch /stream-named as Stream
281+
on status from me
282+
put event.detail.data into #out
283+
end">go</button>
284+
`);
285+
await find('button').click();
286+
await expect.poll(() => find('#out').textContent()).toBe('done');
287+
await restoreFetch(page);
288+
});
289+
290+
test('iterates plain messages with for loop', async ({html, find, page}) => {
291+
await mockSSE(page, '/stream-iter', [
292+
{ data: 'one', delay: 10 },
293+
{ data: 'two', delay: 10 },
294+
{ data: 'three', delay: 10 },
295+
]);
296+
await html(`
297+
<div id="out"></div>
298+
<button _="on click
299+
fetch /stream-iter as Stream
300+
for message in the result
301+
put message + ' ' at end of #out
302+
end">go</button>
303+
`);
304+
await find('button').click();
305+
await expect.poll(() => find('#out').textContent()).toBe('one two three ');
306+
await restoreFetch(page);
307+
});
308+
309+
test('fires streamEnd when the stream closes', async ({html, find, page}) => {
310+
await mockSSE(page, '/streamEnd', [
311+
{ data: 'msg', delay: 50 },
312+
{ data: 'msg2', delay: 50 },
313+
]);
314+
await html(`
315+
<div id="out"></div>
316+
<button _="on click
317+
fetch /streamEnd as Stream
318+
wait for streamEnd from me
319+
put 'finished' into #out">go</button>
320+
`);
321+
await find('button').click();
322+
await expect.poll(() => find('#out').textContent()).toBe('finished');
323+
await restoreFetch(page);
324+
});
325+
})

www/features/eventsource.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,13 +203,88 @@ When a connection drops, hyperscript automatically reconnects with exponential b
203203

204204
Calling `close()` stops reconnection. Calling `open()` again resumes it.
205205

206+
### `fetch` as Stream
207+
208+
The SSE extension also adds a `Stream` conversion to the [`fetch`](/commands/fetch) command.
209+
210+
This is useful for one-shot SSE streams (AI chat responses, progress updates, file processing) where you don't need the long-lived connection management of `eventsource`.
211+
212+
~~~ hyperscript
213+
fetch /api/generate as Stream
214+
~~~
215+
216+
The result is a stream object. Named events from the server are dispatched on the current element, and unnamed messages can be iterated with a `for` loop:
217+
218+
#### Named events
219+
220+
If the server sends named SSE events (with an `event:` field), they are dispatched
221+
as DOM events on the element that initiated the fetch. Handle them with `on`:
222+
223+
~~~ hyperscript
224+
on click
225+
fetch /api/process as Stream
226+
end
227+
on status -- from SSE stream
228+
put event.detail.data into #progress
229+
end
230+
~~~
231+
232+
#### Iterating messages
233+
234+
Unnamed messages (plain `data:` lines with no `event:` field) are available as
235+
an async iterable. Use `for ... in the result` to process each message as it
236+
arrives:
237+
238+
~~~ hyperscript
239+
on click
240+
fetch /api/chat as Stream
241+
for message in the result
242+
put message at end of #output
243+
end
244+
end
245+
~~~
246+
247+
The loop body runs once per message, blocking until the next one arrives. When the stream closes, the loop exits normally.
248+
249+
#### Waiting for the stream to end
250+
251+
The stream fires a `streamEnd` event on the element when the server closes the
252+
connection:
253+
254+
~~~ hyperscript
255+
on click
256+
fetch /api/export as Stream
257+
wait for streamEnd
258+
put "Done!" into me
259+
end
260+
~~~
261+
262+
#### Error handling
263+
264+
If the stream encounters an error, a `streamError` event is dispatched on the element with an `error` property in the detail:
265+
266+
~~~ hyperscript
267+
on click
268+
fetch /api/generate as Stream
269+
on streamError from me
270+
put "Connection lost" into #status
271+
end
272+
end
273+
~~~
274+
275+
{% note %}
276+
Without the SSE extension loaded, `fetch ... as Stream` throws an error telling you to include the extension.
277+
{% endnote %}
278+
206279
### Syntax
207280

208281
```ebnf
209282
eventsource <source-name> [from <source-url>]
210283
[with credentials] [with method <string>] [with headers <object>]
211284
(on <event-pattern> [as (JSON | string)] <command>+ end)*
212285
end
286+
287+
fetch <url> as Stream -- one-shot SSE via the Stream conversion
213288
```
214289

215290
Event patterns support `*` as a wildcard (e.g. `"user.*"`, `"*"`).

0 commit comments

Comments
 (0)