Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 30 additions & 12 deletions test/smoke/clients/c/subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef struct {
pthread_cond_t cv;
int got; // a non-empty frame arrived
int video_started; // guard: start the video track only once
int32_t broadcast; // handle delivered by moq_origin_consume_announced (0 until it arrives)
} ctx_t;

// Callbacks run on libmoq's runtime thread; main waits on the condvar. ctx
Expand All @@ -47,6 +48,19 @@ static void on_frame(void *ud, int32_t frame) {
moq_consume_frame_close((uint32_t)frame);
}

// Delivers the broadcast handle once it's announced, then once more with a
// terminal code (<= 0) we ignore. Store the first positive handle and wake main.
static void on_broadcast(void *ud, int32_t broadcast) {
ctx_t *c = (ctx_t *)ud;
if (broadcast <= 0) return; // 0 = ended, negative = error
pthread_mutex_lock(&c->mu);
if (c->broadcast <= 0) {
c->broadcast = broadcast;
pthread_cond_signal(&c->cv);
}
pthread_mutex_unlock(&c->mu);
}

static void on_catalog(void *ud, int32_t catalog) {
ctx_t *c = (ctx_t *)ud;
if (catalog <= 0) return;
Expand Down Expand Up @@ -91,6 +105,7 @@ int main(int argc, char **argv) {
pthread_cond_init(&c.cv, NULL);
c.got = 0;
c.video_started = 0;
c.broadcast = 0;

int32_t origin = moq_origin_create();
if (origin <= 0) {
Expand All @@ -109,20 +124,23 @@ int main(int argc, char **argv) {
clock_gettime(CLOCK_REALTIME, &deadline);
deadline.tv_sec += (time_t)timeout_s;

// moq_origin_consume is a synchronous lookup, but the broadcast arrives over
// the network after connect. Retry until it's announced (or we run out of
// time). We don't enable libmoq logging, so the misses stay quiet.
int32_t bc = -1;
while (1) {
bc = moq_origin_consume((uint32_t)origin, broadcast, strlen(broadcast));
if (bc > 0) break;
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
if (now.tv_sec >= deadline.tv_sec) break;
usleep(150 * 1000);
// The broadcast arrives over the network after connect, so wait for it to be
// announced. moq_origin_consume_announced resolves via on_broadcast once it's
// available; we block on the condvar until then (or the deadline).
int32_t wait = moq_origin_consume_announced((uint32_t)origin, broadcast, strlen(broadcast), on_broadcast, &c);
if (wait <= 0) {
fprintf(stderr, "error: moq_origin_consume_announced failed: %d\n", wait);
return 1;
}

pthread_mutex_lock(&c.mu);
while (c.broadcast <= 0) {
if (pthread_cond_timedwait(&c.cv, &c.mu, &deadline) != 0) break; // timed out
}
int32_t bc = c.broadcast;
pthread_mutex_unlock(&c.mu);
if (bc <= 0) {
fprintf(stderr, "error: broadcast never announced (moq_origin_consume: %d)\n", bc);
fprintf(stderr, "error: broadcast never announced\n");
return 1;
}

Expand Down
1 change: 1 addition & 0 deletions test/smoke/clients/js-native/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"type": "module",
"dependencies": {
"@moq/hang": "workspace:*",
"@moq/json": "workspace:*",
"@moq/net": "workspace:*",
"@moq/web-transport": "^0.1.2",
"zod": "^4.0.0"
Expand Down
13 changes: 8 additions & 5 deletions test/smoke/clients/js-native/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
import { parseArgs } from "node:util";
import * as Catalog from "@moq/hang/catalog";
import * as Json from "@moq/json";
import * as Moq from "@moq/net";
import { install } from "@moq/web-transport";

Expand Down Expand Up @@ -62,12 +63,14 @@ async function run(): Promise<void> {

const bc = connection.consume(path);

// The .hang catalog lives on the "catalog.json" track. A lazy publisher may
// announce video in a later update, so keep reading frames until one has it.
const catalog = bc.subscribe("catalog.json", Catalog.PRIORITY.catalog);
// The .hang catalog lives on the "catalog.json" track. It's a @moq/json
// snapshot+delta value, reconstructed by Json.Consumer. A lazy publisher may
// announce video in a later update, so keep reading until one has it.
const track = bc.subscribe("catalog.json", Catalog.PRIORITY.catalog);
const catalog = new Json.Consumer<Catalog.Root>(track, { schema: Catalog.RootSchema });
let videoTrack: string | undefined;
while (!videoTrack) {
const root = await Catalog.fetch(catalog);
const root = await catalog.next();
if (!root) throw new Error("catalog ended without a video track");
const renditions = root.video?.renditions;
if (renditions) videoTrack = Object.keys(renditions)[0];
Expand All @@ -81,7 +84,7 @@ async function run(): Promise<void> {
for (;;) {
const frame = await group.readFrame();
if (!frame) break;
total += frame.byteLength;
total += frame.data.byteLength;
if (total > 0) {
// The harness judges success by this marker, not the exit code: the
// @moq/web-transport NAPI addon can segfault during the runtime's exit
Expand Down
4 changes: 2 additions & 2 deletions test/smoke/clients/js/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ try {
while (Date.now() < deadline) {
frames = await page.evaluate(() => {
const w = document.querySelector("moq-watch") as unknown as {
backend?: { video?: { stats?: { peek(): { frameCount?: number } | undefined } } };
backend?: { video?: { output?: { stats?: { peek(): { frameCount?: number } | undefined } } } };
} | null;
return w?.backend?.video?.stats?.peek()?.frameCount ?? 0;
return w?.backend?.video?.output?.stats?.peek()?.frameCount ?? 0;
});
if (frames > 0) break;
// The watch element gives up if it subscribes to the catalog before the
Expand Down
3 changes: 2 additions & 1 deletion test/smoke/clients/python/smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ async def _catalog_with_video(consumer: moq.BroadcastConsumer) -> moq.Catalog:
# The catalog is a live track. A lazy publisher (e.g. the browser, which only
# encodes on demand) may announce video in a *later* update, not the first
# snapshot, so wait for a catalog that actually has a video track.
async for catalog in consumer.subscribe_catalog():
catalog_consumer = await consumer.subscribe_catalog()
async for catalog in catalog_consumer:
if catalog.video:
return catalog
raise RuntimeError("catalog stream ended without a video track")
Expand Down
Loading