Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions doc/concept/layer/hang.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ For example, it's not possible to have a different `flip` or `rotation` value fo
Each rendition is an extension of [VideoDecoderConfig](https://www.w3.org/TR/webcodecs/#video-decoder-config).
This is the minimum amount of information required to initialize a video decoder.

### Cross-broadcast renditions

A rendition may set an optional `broadcast` field: a path relative to the broadcast that served the catalog (e.g. `"../source"`), pointing at another broadcast that publishes the actual track.
A consumer resolves the reference against the catalog broadcast's own path (`..` pops a segment, other segments append) and subscribes to the track on the resolved broadcast over the same connection.
When the field is absent, the track lives in the same broadcast as the catalog.

This lets a transcoder publish a sidecar catalog that adds new renditions while pointing unchanged ones at the original broadcast, instead of re-publishing those bytes through the transcoder.
For example, a transcoder consuming `room/source` can publish `room/transcode` whose catalog contains a downscaled `480p` rendition plus the original `1080p` rendition marked `"broadcast": "../source"`.
A viewer of `room/transcode` then pulls `480p` from the transcoder and `1080p` directly from the source, and the relay dedupes the source subscription with the transcoder's own.

`@moq/watch` resolves the reference automatically. In Rust, the `moq-mux` exporters do the same when built from a `Source` carrying origin context (`Source::new(broadcast).with_origin(origin, path)`); a bare `BroadcastConsumer` cannot resolve one and fails with a clear error.

### Extensions

The base catalog carries only the media sections (`video` and `audio`).
Expand Down
6 changes: 6 additions & 0 deletions js/hang/src/catalog/audio.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as z from "zod/mini";
import { ContainerSchema } from "./container";
import { u53Schema } from "./integers";
import { RelativeBroadcastSchema } from "./path";

// Backwards compatibility: old track schema
const TrackSchema = z.object({
Expand All @@ -12,6 +13,11 @@ const TrackSchema = z.object({
* Mirrors WebCodecs AudioDecoderConfig (https://w3c.github.io/webcodecs/#audio-decoder-config).
*/
export const AudioConfigSchema = z.object({
// Optional reference to another broadcast that publishes this track, expressed
// relative to the broadcast that served this catalog (e.g. "../source").
// If unset, the track lives in the same broadcast as the catalog.
broadcast: z.optional(RelativeBroadcastSchema),

// See: https://w3c.github.io/webcodecs/codec_registry.html
codec: z.string(),

Expand Down
1 change: 1 addition & 0 deletions js/hang/src/catalog/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export * from "./audio";
export * from "./container";
export * from "./format";
export * from "./integers";
export * from "./path";
export * from "./priority";
export * from "./root";
export * from "./track";
Expand Down
13 changes: 13 additions & 0 deletions js/hang/src/catalog/path.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Path } from "@moq/net";
import * as z from "zod/mini";

/**
* Zod schema for a relative broadcast reference stored in a catalog (a rendition's
* `broadcast` field, e.g. "../source"). Normalizes the input the same way the Rust
* `PathRelative` type does so JS and Rust agree byte-for-byte after deserialization.
* Resolve it against the catalog broadcast's own path with `Path.resolve`.
*/
export const RelativeBroadcastSchema = z.pipe(z.string(), z.transform(Path.normalizeRelative));

/** A normalized relative broadcast reference. */
export type RelativeBroadcast = z.infer<typeof RelativeBroadcastSchema>;
31 changes: 31 additions & 0 deletions js/hang/src/catalog/root.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,34 @@ test("extended schema validates app sections", () => {
// The extended schema enforces the app's section type.
expect(() => ExtendedSchema.parse({ scte35: { spliceId: "nope" } })).toThrow();
});

test("rendition broadcast reference is parsed and normalized", () => {
const catalog = {
video: {
renditions: {
video: {
broadcast: ".././source/",
codec: "avc1.64001f",
container: { kind: "legacy" },
},
},
},
};
const parsed = RootSchema.parse(catalog);
if (!parsed.video || !("renditions" in parsed.video)) throw new Error("missing video section");
// Normalized like Rust PathRelative: `.` and empty segments dropped, `..` preserved.
expect(parsed.video.renditions.video?.broadcast).toBe("../source");
});

test("rendition without broadcast reference stays undefined", () => {
const catalog = {
video: {
renditions: {
video: { codec: "avc1.64001f", container: { kind: "legacy" } },
},
},
};
const parsed = RootSchema.parse(catalog);
if (!parsed.video || !("renditions" in parsed.video)) throw new Error("missing video section");
expect(parsed.video.renditions.video?.broadcast).toBeUndefined();
});
6 changes: 6 additions & 0 deletions js/hang/src/catalog/video.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as z from "zod/mini";
import { ContainerSchema } from "./container";
import { u53Schema } from "./integers";
import { RelativeBroadcastSchema } from "./path";

// Backwards compatibility: old track schema
const TrackSchema = z.object({
Expand All @@ -9,6 +10,11 @@ const TrackSchema = z.object({

/** Schema for a single video rendition's decoder config. Mirrors WebCodecs VideoDecoderConfig. */
export const VideoConfigSchema = z.object({
// Optional reference to another broadcast that publishes this track, expressed
// relative to the broadcast that served this catalog (e.g. "../source").
// If unset, the track lives in the same broadcast as the catalog.
broadcast: z.optional(RelativeBroadcastSchema),

// See: https://w3c.github.io/webcodecs/codec_registry.html
codec: z.string(),

Expand Down
50 changes: 50 additions & 0 deletions js/net/src/path.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,53 @@ test("from sanitizes multiple arguments with slashes", () => {
expect(Path.from("/foo/", "/bar/", "/baz/")).toBe("foo/bar/baz" as Path.Valid);
expect(Path.from("foo//", "//bar", "baz")).toBe("foo/bar/baz" as Path.Valid);
});

test("resolve appends named segments", () => {
expect(Path.resolve(Path.from("a/b"), "c")).toBe(Path.from("a/b/c"));
expect(Path.resolve(Path.from("a/b"), "c/d")).toBe(Path.from("a/b/c/d"));
});

test("resolve with empty rel returns base", () => {
expect(Path.resolve(Path.from("a/b"), "")).toBe(Path.from("a/b"));
});

test("resolve single dotdot pops one segment", () => {
expect(Path.resolve(Path.from("a/b/c"), "../d")).toBe(Path.from("a/b/d"));
expect(Path.resolve(Path.from("a/b/c"), "..")).toBe(Path.from("a/b"));
});

test("resolve multiple dotdot pops multiple segments", () => {
expect(Path.resolve(Path.from("a/b/c"), "../../x")).toBe(Path.from("a/x"));
expect(Path.resolve(Path.from("a/b/c"), "../../../x")).toBe(Path.from("x"));
});

test("resolve excess dotdot clamps at empty", () => {
expect(Path.resolve(Path.from("a"), "../../../foo")).toBe(Path.from("foo"));
expect(Path.resolve(Path.from("a"), "..")).toBe(Path.from(""));
});

test("resolve with empty base", () => {
expect(Path.resolve(Path.empty(), "foo")).toBe(Path.from("foo"));
expect(Path.resolve(Path.empty(), "..")).toBe(Path.from(""));
});

test("resolve treats dot as a no-op", () => {
expect(Path.resolve(Path.from("a/b"), ".")).toBe(Path.from("a/b"));
expect(Path.resolve(Path.from("a/b"), "./c")).toBe(Path.from("a/b/c"));
expect(Path.resolve(Path.from("a/b"), "./../c")).toBe(Path.from("a/c"));
expect(Path.resolve(Path.from("a/b"), "foo/./bar")).toBe(Path.from("a/b/foo/bar"));
});

test("resolve self-reference via dotdot equals base", () => {
expect(Path.resolve(Path.from("a/b"), "../b")).toBe(Path.from("a/b"));
});

test("normalizeRelative drops empty and dot segments", () => {
expect(Path.normalizeRelative("")).toBe("");
expect(Path.normalizeRelative(".")).toBe("");
expect(Path.normalizeRelative("./foo")).toBe("foo");
expect(Path.normalizeRelative("foo//bar")).toBe("foo/bar");
expect(Path.normalizeRelative("foo/./bar")).toBe("foo/bar");
expect(Path.normalizeRelative("/foo/")).toBe("foo");
expect(Path.normalizeRelative("../foo")).toBe("../foo");
});
52 changes: 52 additions & 0 deletions js/net/src/path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,55 @@ export function join(path: Valid, other: Valid): Valid {
export function empty(): Valid {
return "" as Valid;
}

/**
* Normalize a relative path reference: trim leading/trailing slashes, drop empty
* segments, and drop `.` segments (no-ops, matching POSIX). `..` is preserved and
* only interpreted by {@link resolve}.
*
* Mirrors the Rust `PathRelative::new` normalization, so JS and Rust agree
* byte-for-byte on the stored form. Two callers comparing normalized strings can
* detect that `""`, `"."`, `"/./"` etc. all mean "no reference".
*/
export function normalizeRelative(rel: string): string {
return rel
.split("/")
.filter((s) => s !== "" && s !== ".")
.join("/");
}

/**
* Resolve a relative path reference against a base path.
*
* `..` segments pop the last segment of the base; other segments are appended.
* `.` and empty segments are no-ops. Excess `..` once the base is empty is also a
* no-op (subsequent named segments still append). An empty / normalized-empty `rel`
* returns the base path unchanged.
*
* Mirrors the Rust `Path::resolve`, used by hang catalogs to express
* cross-broadcast track references (a rendition's `broadcast` field).
*
* @example
* ```typescript
* Path.resolve(Path.from("a/b/c"), "../source"); // "a/b/source"
* Path.resolve(Path.from("a/b"), "x/y"); // "a/b/x/y"
* Path.resolve(Path.from("a"), "../../x"); // "x"
* Path.resolve(Path.from("a/b"), "./c"); // "a/b/c"
* ```
*/
export function resolve(base: Valid, rel: string): Valid {
const segments = base === "" ? [] : base.split("/");

for (const seg of rel.split("/")) {
if (seg === "" || seg === ".") {
continue;
}
if (seg === "..") {
segments.pop();
} else {
segments.push(seg);
}
}

return segments.join("/") as Valid;
}
4 changes: 3 additions & 1 deletion js/watch/src/audio/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ export class Decoder {
const config = effect.get(this.source.output.config);
if (!config) return;

const active = effect.get(broadcast.output.active);
// Honor a per-rendition `broadcast` override: subscribe on the resolved source
// broadcast instead of the catalog's own broadcast.
const active = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) return;

const sub = active.track(track).subscribe({ priority: Catalog.PRIORITY.audio });
Expand Down
8 changes: 5 additions & 3 deletions js/watch/src/audio/mse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ export class Mse implements Backend {
const broadcast = effect.get(this.source.input.broadcast);
if (!broadcast) return;

const active = effect.get(broadcast.output.active);
if (!active) return;

const track = effect.get(this.source.output.track);
if (!track) return;

const config = effect.get(this.source.output.config);
if (!config) return;

// Honor a per-rendition `broadcast` override: subscribe on the resolved source
// broadcast instead of the catalog's own broadcast.
const active = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) return;

const mime = `audio/mp4; codecs="${config.codec}"`;

const sourceBuffer = mediaSource.addSourceBuffer(mime);
Expand Down
71 changes: 63 additions & 8 deletions js/watch/src/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,28 @@ export class Broadcast {
}

#runAnnouncedNow(effect: Effect): void {
const name = effect.get(this.input.name);
this.#announcedNow.set(this.#isPathAnnounced(effect, name));
}

// Whether `name` is currently announced on the connection (or skipping the check
// because reload is off or the relay doesn't support announcements). Used by both
// `#runAnnouncedNow` (for `input.name`) and `#override` (for cross-broadcast refs).
#isPathAnnounced(effect: Effect, name: Moq.Path.Valid): boolean {
const reload = effect.get(this.input.reload);
if (!reload) {
this.#announcedNow.set(true);
return;
}
if (!reload) return true;

// Cloudflare's relay does not yet support announcement subscriptions,
// so an announcement will never arrive. Fall back to subscribing
// immediately (reload=false behaviour) instead of waiting forever.
const conn = effect.get(this.input.connection);
if (conn?.url.hostname.endsWith("mediaoverquic.com")) {
console.warn("Cloudflare relay does not support broadcast discovery yet; ignoring reload signal.");
this.#announcedNow.set(true);
return;
return true;
}

const name = effect.get(this.input.name);
const announced = effect.get(this.#announced);
this.#announcedNow.set(announced.has(name));
return announced.has(name);
}

#runBroadcast(effect: Effect): void {
Expand Down Expand Up @@ -195,6 +198,58 @@ export class Broadcast {
});
}

/**
* Resolve the `Moq.Broadcast` that publishes a given track.
*
* If `rel` is set (a rendition's catalog `broadcast` field), treat it as a path
* relative to this broadcast's name and consume the resolved broadcast on the same
* connection. Otherwise return the catalog's own active broadcast.
*
* Override broadcasts are cached per resolved path and owned by this Broadcast's
* `signals`; the caller's `effect` only subscribes to the cached signal. Many
* renditions referencing the same source thus share one underlying subscription,
* and the override outlives any single caller effect.
*/
trackBroadcast(effect: Effect, rel: string | undefined): Moq.Broadcast | undefined {
if (!rel) return effect.get(this.output.active);

const base = effect.get(this.input.name);
const resolved = Path.resolve(base, rel);

// A reference that walks back to the catalog's own broadcast (or resolves to
// the empty root, via excess `..`) is served by the catalog broadcast itself,
// avoiding a duplicate subscription on the same path.
if (resolved === base || resolved === Path.empty()) return effect.get(this.output.active);

return effect.get(this.#override(resolved));
}

#overrides = new Map<Moq.Path.Valid, Signal<Moq.Broadcast | undefined>>();

#override(path: Moq.Path.Valid): Signal<Moq.Broadcast | undefined> {
const cached = this.#overrides.get(path);
if (cached) return cached;

const signal = new Signal<Moq.Broadcast | undefined>(undefined);
this.#overrides.set(path, signal);

this.signals.run((effect) => {
const enabled = effect.get(this.input.enabled);
if (!enabled) return;

const conn = effect.get(this.input.connection);
if (!conn) return;

if (!this.#isPathAnnounced(effect, path)) return;

const broadcast = conn.consume(path);
effect.cleanup(() => broadcast.close());
effect.set(signal, broadcast, undefined);
});

return signal;
}

close() {
this.signals.close();
}
Expand Down
4 changes: 3 additions & 1 deletion js/watch/src/video/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ export class Decoder implements Backend {
}
const [_, broadcast, track, config] = values;

const active: Moq.Broadcast | undefined = effect.get(broadcast.output.active);
// Honor a per-rendition `broadcast` override: subscribe on the resolved source
// broadcast instead of the catalog's own broadcast.
const active: Moq.Broadcast | undefined = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) {
// Going offline should clear the last rendered frame.
this.#active.set(undefined);
Expand Down
8 changes: 5 additions & 3 deletions js/watch/src/video/mse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ export class Mse implements Backend {
const broadcast = effect.get(this.source.input.broadcast);
if (!broadcast) return;

const active = effect.get(broadcast.output.active);
if (!active) return;

const track = effect.get(this.source.output.track);
if (!track) return;

const config = effect.get(this.source.output.config);
if (!config) return;

// Honor a per-rendition `broadcast` override: subscribe on the resolved source
// broadcast instead of the catalog's own broadcast.
const active = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) return;

const mime = `video/mp4; codecs="${config.codec}"`;

const sourceBuffer = mediaSource.addSourceBuffer(mime);
Expand Down
Loading