diff --git a/bun.lock b/bun.lock index 6b1fd218f..d03bf6f63 100644 --- a/bun.lock +++ b/bun.lock @@ -283,6 +283,7 @@ "name": "@moq/smoke-native", "dependencies": { "@moq/hang": "workspace:*", + "@moq/json": "workspace:*", "@moq/net": "workspace:*", "@moq/web-transport": "^0.1.2", "zod": "^4.0.0", diff --git a/test/smoke/clients/c/subscribe.c b/test/smoke/clients/c/subscribe.c index 25c157b2a..55bf2fcdb 100644 --- a/test/smoke/clients/c/subscribe.c +++ b/test/smoke/clients/c/subscribe.c @@ -23,6 +23,7 @@ typedef struct { pthread_cond_t cv; int got; // a non-empty frame arrived int video_started; // guard: start the video track only once + int32_t broadcast; // handle delivered by moq_origin_consume_announced (0 until it arrives) } ctx_t; // Callbacks run on libmoq's runtime thread; main waits on the condvar. ctx @@ -47,6 +48,19 @@ static void on_frame(void *ud, int32_t frame) { moq_consume_frame_close((uint32_t)frame); } +// Delivers the broadcast handle once it's announced, then once more with a +// terminal code (<= 0) we ignore. Store the first positive handle and wake main. +static void on_broadcast(void *ud, int32_t broadcast) { + ctx_t *c = (ctx_t *)ud; + if (broadcast <= 0) return; // 0 = ended, negative = error + pthread_mutex_lock(&c->mu); + if (c->broadcast <= 0) { + c->broadcast = broadcast; + pthread_cond_signal(&c->cv); + } + pthread_mutex_unlock(&c->mu); +} + static void on_catalog(void *ud, int32_t catalog) { ctx_t *c = (ctx_t *)ud; if (catalog <= 0) return; @@ -91,6 +105,7 @@ int main(int argc, char **argv) { pthread_cond_init(&c.cv, NULL); c.got = 0; c.video_started = 0; + c.broadcast = 0; int32_t origin = moq_origin_create(); if (origin <= 0) { @@ -109,20 +124,23 @@ int main(int argc, char **argv) { clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_sec += (time_t)timeout_s; - // moq_origin_consume is a synchronous lookup, but the broadcast arrives over - // the network after connect. Retry until it's announced (or we run out of - // time). We don't enable libmoq logging, so the misses stay quiet. - int32_t bc = -1; - while (1) { - bc = moq_origin_consume((uint32_t)origin, broadcast, strlen(broadcast)); - if (bc > 0) break; - struct timespec now; - clock_gettime(CLOCK_REALTIME, &now); - if (now.tv_sec >= deadline.tv_sec) break; - usleep(150 * 1000); + // The broadcast arrives over the network after connect, so wait for it to be + // announced. moq_origin_consume_announced resolves via on_broadcast once it's + // available; we block on the condvar until then (or the deadline). + int32_t wait = moq_origin_consume_announced((uint32_t)origin, broadcast, strlen(broadcast), on_broadcast, &c); + if (wait <= 0) { + fprintf(stderr, "error: moq_origin_consume_announced failed: %d\n", wait); + return 1; + } + + pthread_mutex_lock(&c.mu); + while (c.broadcast <= 0) { + if (pthread_cond_timedwait(&c.cv, &c.mu, &deadline) != 0) break; // timed out } + int32_t bc = c.broadcast; + pthread_mutex_unlock(&c.mu); if (bc <= 0) { - fprintf(stderr, "error: broadcast never announced (moq_origin_consume: %d)\n", bc); + fprintf(stderr, "error: broadcast never announced\n"); return 1; } diff --git a/test/smoke/clients/js-native/package.json b/test/smoke/clients/js-native/package.json index a608b438d..f62605571 100644 --- a/test/smoke/clients/js-native/package.json +++ b/test/smoke/clients/js-native/package.json @@ -4,6 +4,7 @@ "type": "module", "dependencies": { "@moq/hang": "workspace:*", + "@moq/json": "workspace:*", "@moq/net": "workspace:*", "@moq/web-transport": "^0.1.2", "zod": "^4.0.0" diff --git a/test/smoke/clients/js-native/subscribe.ts b/test/smoke/clients/js-native/subscribe.ts index 8e87d655f..713164438 100644 --- a/test/smoke/clients/js-native/subscribe.ts +++ b/test/smoke/clients/js-native/subscribe.ts @@ -13,6 +13,7 @@ */ import { parseArgs } from "node:util"; import * as Catalog from "@moq/hang/catalog"; +import * as Json from "@moq/json"; import * as Moq from "@moq/net"; import { install } from "@moq/web-transport"; @@ -62,12 +63,14 @@ async function run(): Promise { const bc = connection.consume(path); - // The .hang catalog lives on the "catalog.json" track. A lazy publisher may - // announce video in a later update, so keep reading frames until one has it. - const catalog = bc.subscribe("catalog.json", Catalog.PRIORITY.catalog); + // The .hang catalog lives on the "catalog.json" track. It's a @moq/json + // snapshot+delta value, reconstructed by Json.Consumer. A lazy publisher may + // announce video in a later update, so keep reading until one has it. + const track = bc.subscribe("catalog.json", Catalog.PRIORITY.catalog); + const catalog = new Json.Consumer(track, { schema: Catalog.RootSchema }); let videoTrack: string | undefined; while (!videoTrack) { - const root = await Catalog.fetch(catalog); + const root = await catalog.next(); if (!root) throw new Error("catalog ended without a video track"); const renditions = root.video?.renditions; if (renditions) videoTrack = Object.keys(renditions)[0]; @@ -81,7 +84,7 @@ async function run(): Promise { for (;;) { const frame = await group.readFrame(); if (!frame) break; - total += frame.byteLength; + total += frame.data.byteLength; if (total > 0) { // The harness judges success by this marker, not the exit code: the // @moq/web-transport NAPI addon can segfault during the runtime's exit diff --git a/test/smoke/clients/js/driver.ts b/test/smoke/clients/js/driver.ts index 7c456e189..27bd11bed 100644 --- a/test/smoke/clients/js/driver.ts +++ b/test/smoke/clients/js/driver.ts @@ -79,9 +79,9 @@ try { while (Date.now() < deadline) { frames = await page.evaluate(() => { const w = document.querySelector("moq-watch") as unknown as { - backend?: { video?: { stats?: { peek(): { frameCount?: number } | undefined } } }; + backend?: { video?: { output?: { stats?: { peek(): { frameCount?: number } | undefined } } } }; } | null; - return w?.backend?.video?.stats?.peek()?.frameCount ?? 0; + return w?.backend?.video?.output?.stats?.peek()?.frameCount ?? 0; }); if (frames > 0) break; // The watch element gives up if it subscribes to the catalog before the diff --git a/test/smoke/clients/python/smoke.py b/test/smoke/clients/python/smoke.py index 092cc45b2..2ed841d42 100644 --- a/test/smoke/clients/python/smoke.py +++ b/test/smoke/clients/python/smoke.py @@ -47,7 +47,8 @@ async def _catalog_with_video(consumer: moq.BroadcastConsumer) -> moq.Catalog: # The catalog is a live track. A lazy publisher (e.g. the browser, which only # encodes on demand) may announce video in a *later* update, not the first # snapshot, so wait for a catalog that actually has a video track. - async for catalog in consumer.subscribe_catalog(): + catalog_consumer = await consumer.subscribe_catalog() + async for catalog in catalog_consumer: if catalog.video: return catalog raise RuntimeError("catalog stream ended without a video track")