diff --git a/doc/concept/layer/hang.md b/doc/concept/layer/hang.md index 8eda4974b..6d5416aa8 100644 --- a/doc/concept/layer/hang.md +++ b/doc/concept/layer/hang.md @@ -75,6 +75,18 @@ For example, it's not possible to have a different `flip` or `rotation` value fo Each rendition is an extension of [VideoDecoderConfig](https://www.w3.org/TR/webcodecs/#video-decoder-config). This is the minimum amount of information required to initialize a video decoder. +### Cross-broadcast renditions + +A rendition may set an optional `broadcast` field: a path relative to the broadcast that served the catalog (e.g. `"../source"`), pointing at another broadcast that publishes the actual track. +A consumer resolves the reference against the catalog broadcast's own path (`..` pops a segment, other segments append) and subscribes to the track on the resolved broadcast over the same connection. +When the field is absent, the track lives in the same broadcast as the catalog. + +This lets a transcoder publish a sidecar catalog that adds new renditions while pointing unchanged ones at the original broadcast, instead of re-publishing those bytes through the transcoder. +For example, a transcoder consuming `room/source` can publish `room/transcode` whose catalog contains a downscaled `480p` rendition plus the original `1080p` rendition marked `"broadcast": "../source"`. +A viewer of `room/transcode` then pulls `480p` from the transcoder and `1080p` directly from the source, and the relay dedupes the source subscription with the transcoder's own. + +`@moq/watch` resolves the reference automatically. In Rust, the `moq-mux` exporters do the same when built from a `Source` carrying origin context (`Source::new(broadcast).with_origin(origin, path)`); a bare `BroadcastConsumer` cannot resolve one and fails with a clear error. + ### Extensions The base catalog carries only the media sections (`video` and `audio`). diff --git a/js/hang/src/catalog/audio.ts b/js/hang/src/catalog/audio.ts index 1cedcd586..cdd810267 100644 --- a/js/hang/src/catalog/audio.ts +++ b/js/hang/src/catalog/audio.ts @@ -1,6 +1,7 @@ import * as z from "zod/mini"; import { ContainerSchema } from "./container"; import { u53Schema } from "./integers"; +import { RelativeBroadcastSchema } from "./path"; // Backwards compatibility: old track schema const TrackSchema = z.object({ @@ -12,6 +13,11 @@ const TrackSchema = z.object({ * Mirrors WebCodecs AudioDecoderConfig (https://w3c.github.io/webcodecs/#audio-decoder-config). */ export const AudioConfigSchema = z.object({ + // Optional reference to another broadcast that publishes this track, expressed + // relative to the broadcast that served this catalog (e.g. "../source"). + // If unset, the track lives in the same broadcast as the catalog. + broadcast: z.optional(RelativeBroadcastSchema), + // See: https://w3c.github.io/webcodecs/codec_registry.html codec: z.string(), diff --git a/js/hang/src/catalog/index.ts b/js/hang/src/catalog/index.ts index 6a0c787b5..b7561952b 100644 --- a/js/hang/src/catalog/index.ts +++ b/js/hang/src/catalog/index.ts @@ -9,6 +9,7 @@ export * from "./audio"; export * from "./container"; export * from "./format"; export * from "./integers"; +export * from "./path"; export * from "./priority"; export * from "./root"; export * from "./track"; diff --git a/js/hang/src/catalog/path.ts b/js/hang/src/catalog/path.ts new file mode 100644 index 000000000..002f6bb1c --- /dev/null +++ b/js/hang/src/catalog/path.ts @@ -0,0 +1,13 @@ +import { Path } from "@moq/net"; +import * as z from "zod/mini"; + +/** + * Zod schema for a relative broadcast reference stored in a catalog (a rendition's + * `broadcast` field, e.g. "../source"). Normalizes the input the same way the Rust + * `PathRelative` type does so JS and Rust agree byte-for-byte after deserialization. + * Resolve it against the catalog broadcast's own path with `Path.resolve`. + */ +export const RelativeBroadcastSchema = z.pipe(z.string(), z.transform(Path.normalizeRelative)); + +/** A normalized relative broadcast reference. */ +export type RelativeBroadcast = z.infer; diff --git a/js/hang/src/catalog/root.test.ts b/js/hang/src/catalog/root.test.ts index 5e2a2f3bc..bc3a2eb20 100644 --- a/js/hang/src/catalog/root.test.ts +++ b/js/hang/src/catalog/root.test.ts @@ -21,3 +21,34 @@ test("extended schema validates app sections", () => { // The extended schema enforces the app's section type. expect(() => ExtendedSchema.parse({ scte35: { spliceId: "nope" } })).toThrow(); }); + +test("rendition broadcast reference is parsed and normalized", () => { + const catalog = { + video: { + renditions: { + video: { + broadcast: ".././source/", + codec: "avc1.64001f", + container: { kind: "legacy" }, + }, + }, + }, + }; + const parsed = RootSchema.parse(catalog); + if (!parsed.video || !("renditions" in parsed.video)) throw new Error("missing video section"); + // Normalized like Rust PathRelative: `.` and empty segments dropped, `..` preserved. + expect(parsed.video.renditions.video?.broadcast).toBe("../source"); +}); + +test("rendition without broadcast reference stays undefined", () => { + const catalog = { + video: { + renditions: { + video: { codec: "avc1.64001f", container: { kind: "legacy" } }, + }, + }, + }; + const parsed = RootSchema.parse(catalog); + if (!parsed.video || !("renditions" in parsed.video)) throw new Error("missing video section"); + expect(parsed.video.renditions.video?.broadcast).toBeUndefined(); +}); diff --git a/js/hang/src/catalog/video.ts b/js/hang/src/catalog/video.ts index 54b7665dc..56ce50105 100644 --- a/js/hang/src/catalog/video.ts +++ b/js/hang/src/catalog/video.ts @@ -1,6 +1,7 @@ import * as z from "zod/mini"; import { ContainerSchema } from "./container"; import { u53Schema } from "./integers"; +import { RelativeBroadcastSchema } from "./path"; // Backwards compatibility: old track schema const TrackSchema = z.object({ @@ -9,6 +10,11 @@ const TrackSchema = z.object({ /** Schema for a single video rendition's decoder config. Mirrors WebCodecs VideoDecoderConfig. */ export const VideoConfigSchema = z.object({ + // Optional reference to another broadcast that publishes this track, expressed + // relative to the broadcast that served this catalog (e.g. "../source"). + // If unset, the track lives in the same broadcast as the catalog. + broadcast: z.optional(RelativeBroadcastSchema), + // See: https://w3c.github.io/webcodecs/codec_registry.html codec: z.string(), diff --git a/js/net/src/path.test.ts b/js/net/src/path.test.ts index 6c6e81d03..d4dba74c1 100644 --- a/js/net/src/path.test.ts +++ b/js/net/src/path.test.ts @@ -189,3 +189,53 @@ test("from sanitizes multiple arguments with slashes", () => { expect(Path.from("/foo/", "/bar/", "/baz/")).toBe("foo/bar/baz" as Path.Valid); expect(Path.from("foo//", "//bar", "baz")).toBe("foo/bar/baz" as Path.Valid); }); + +test("resolve appends named segments", () => { + expect(Path.resolve(Path.from("a/b"), "c")).toBe(Path.from("a/b/c")); + expect(Path.resolve(Path.from("a/b"), "c/d")).toBe(Path.from("a/b/c/d")); +}); + +test("resolve with empty rel returns base", () => { + expect(Path.resolve(Path.from("a/b"), "")).toBe(Path.from("a/b")); +}); + +test("resolve single dotdot pops one segment", () => { + expect(Path.resolve(Path.from("a/b/c"), "../d")).toBe(Path.from("a/b/d")); + expect(Path.resolve(Path.from("a/b/c"), "..")).toBe(Path.from("a/b")); +}); + +test("resolve multiple dotdot pops multiple segments", () => { + expect(Path.resolve(Path.from("a/b/c"), "../../x")).toBe(Path.from("a/x")); + expect(Path.resolve(Path.from("a/b/c"), "../../../x")).toBe(Path.from("x")); +}); + +test("resolve excess dotdot clamps at empty", () => { + expect(Path.resolve(Path.from("a"), "../../../foo")).toBe(Path.from("foo")); + expect(Path.resolve(Path.from("a"), "..")).toBe(Path.from("")); +}); + +test("resolve with empty base", () => { + expect(Path.resolve(Path.empty(), "foo")).toBe(Path.from("foo")); + expect(Path.resolve(Path.empty(), "..")).toBe(Path.from("")); +}); + +test("resolve treats dot as a no-op", () => { + expect(Path.resolve(Path.from("a/b"), ".")).toBe(Path.from("a/b")); + expect(Path.resolve(Path.from("a/b"), "./c")).toBe(Path.from("a/b/c")); + expect(Path.resolve(Path.from("a/b"), "./../c")).toBe(Path.from("a/c")); + expect(Path.resolve(Path.from("a/b"), "foo/./bar")).toBe(Path.from("a/b/foo/bar")); +}); + +test("resolve self-reference via dotdot equals base", () => { + expect(Path.resolve(Path.from("a/b"), "../b")).toBe(Path.from("a/b")); +}); + +test("normalizeRelative drops empty and dot segments", () => { + expect(Path.normalizeRelative("")).toBe(""); + expect(Path.normalizeRelative(".")).toBe(""); + expect(Path.normalizeRelative("./foo")).toBe("foo"); + expect(Path.normalizeRelative("foo//bar")).toBe("foo/bar"); + expect(Path.normalizeRelative("foo/./bar")).toBe("foo/bar"); + expect(Path.normalizeRelative("/foo/")).toBe("foo"); + expect(Path.normalizeRelative("../foo")).toBe("../foo"); +}); diff --git a/js/net/src/path.ts b/js/net/src/path.ts index 16c3c631a..c2fe89b03 100644 --- a/js/net/src/path.ts +++ b/js/net/src/path.ts @@ -137,3 +137,55 @@ export function join(path: Valid, other: Valid): Valid { export function empty(): Valid { return "" as Valid; } + +/** + * Normalize a relative path reference: trim leading/trailing slashes, drop empty + * segments, and drop `.` segments (no-ops, matching POSIX). `..` is preserved and + * only interpreted by {@link resolve}. + * + * Mirrors the Rust `PathRelative::new` normalization, so JS and Rust agree + * byte-for-byte on the stored form. Two callers comparing normalized strings can + * detect that `""`, `"."`, `"/./"` etc. all mean "no reference". + */ +export function normalizeRelative(rel: string): string { + return rel + .split("/") + .filter((s) => s !== "" && s !== ".") + .join("/"); +} + +/** + * Resolve a relative path reference against a base path. + * + * `..` segments pop the last segment of the base; other segments are appended. + * `.` and empty segments are no-ops. Excess `..` once the base is empty is also a + * no-op (subsequent named segments still append). An empty / normalized-empty `rel` + * returns the base path unchanged. + * + * Mirrors the Rust `Path::resolve`, used by hang catalogs to express + * cross-broadcast track references (a rendition's `broadcast` field). + * + * @example + * ```typescript + * Path.resolve(Path.from("a/b/c"), "../source"); // "a/b/source" + * Path.resolve(Path.from("a/b"), "x/y"); // "a/b/x/y" + * Path.resolve(Path.from("a"), "../../x"); // "x" + * Path.resolve(Path.from("a/b"), "./c"); // "a/b/c" + * ``` + */ +export function resolve(base: Valid, rel: string): Valid { + const segments = base === "" ? [] : base.split("/"); + + for (const seg of rel.split("/")) { + if (seg === "" || seg === ".") { + continue; + } + if (seg === "..") { + segments.pop(); + } else { + segments.push(seg); + } + } + + return segments.join("/") as Valid; +} diff --git a/js/watch/src/audio/decoder.ts b/js/watch/src/audio/decoder.ts index f69b10d1d..532b31067 100644 --- a/js/watch/src/audio/decoder.ts +++ b/js/watch/src/audio/decoder.ts @@ -202,7 +202,9 @@ export class Decoder { const config = effect.get(this.source.output.config); if (!config) return; - const active = effect.get(broadcast.output.active); + // Honor a per-rendition `broadcast` override: subscribe on the resolved source + // broadcast instead of the catalog's own broadcast. + const active = broadcast.trackBroadcast(effect, config.broadcast); if (!active) return; const sub = active.track(track).subscribe({ priority: Catalog.PRIORITY.audio }); diff --git a/js/watch/src/audio/mse.ts b/js/watch/src/audio/mse.ts index a7a9793c3..2981cbd8a 100644 --- a/js/watch/src/audio/mse.ts +++ b/js/watch/src/audio/mse.ts @@ -62,15 +62,17 @@ export class Mse implements Backend { const broadcast = effect.get(this.source.input.broadcast); if (!broadcast) return; - const active = effect.get(broadcast.output.active); - if (!active) return; - const track = effect.get(this.source.output.track); if (!track) return; const config = effect.get(this.source.output.config); if (!config) return; + // Honor a per-rendition `broadcast` override: subscribe on the resolved source + // broadcast instead of the catalog's own broadcast. + const active = broadcast.trackBroadcast(effect, config.broadcast); + if (!active) return; + const mime = `audio/mp4; codecs="${config.codec}"`; const sourceBuffer = mediaSource.addSourceBuffer(mime); diff --git a/js/watch/src/broadcast.ts b/js/watch/src/broadcast.ts index 4d5964ea1..26f1db672 100644 --- a/js/watch/src/broadcast.ts +++ b/js/watch/src/broadcast.ts @@ -94,11 +94,16 @@ export class Broadcast { } #runAnnouncedNow(effect: Effect): void { + const name = effect.get(this.input.name); + this.#announcedNow.set(this.#isPathAnnounced(effect, name)); + } + + // Whether `name` is currently announced on the connection (or skipping the check + // because reload is off or the relay doesn't support announcements). Used by both + // `#runAnnouncedNow` (for `input.name`) and `#override` (for cross-broadcast refs). + #isPathAnnounced(effect: Effect, name: Moq.Path.Valid): boolean { const reload = effect.get(this.input.reload); - if (!reload) { - this.#announcedNow.set(true); - return; - } + if (!reload) return true; // Cloudflare's relay does not yet support announcement subscriptions, // so an announcement will never arrive. Fall back to subscribing @@ -106,13 +111,11 @@ export class Broadcast { const conn = effect.get(this.input.connection); if (conn?.url.hostname.endsWith("mediaoverquic.com")) { console.warn("Cloudflare relay does not support broadcast discovery yet; ignoring reload signal."); - this.#announcedNow.set(true); - return; + return true; } - const name = effect.get(this.input.name); const announced = effect.get(this.#announced); - this.#announcedNow.set(announced.has(name)); + return announced.has(name); } #runBroadcast(effect: Effect): void { @@ -195,6 +198,58 @@ export class Broadcast { }); } + /** + * Resolve the `Moq.Broadcast` that publishes a given track. + * + * If `rel` is set (a rendition's catalog `broadcast` field), treat it as a path + * relative to this broadcast's name and consume the resolved broadcast on the same + * connection. Otherwise return the catalog's own active broadcast. + * + * Override broadcasts are cached per resolved path and owned by this Broadcast's + * `signals`; the caller's `effect` only subscribes to the cached signal. Many + * renditions referencing the same source thus share one underlying subscription, + * and the override outlives any single caller effect. + */ + trackBroadcast(effect: Effect, rel: string | undefined): Moq.Broadcast | undefined { + if (!rel) return effect.get(this.output.active); + + const base = effect.get(this.input.name); + const resolved = Path.resolve(base, rel); + + // A reference that walks back to the catalog's own broadcast (or resolves to + // the empty root, via excess `..`) is served by the catalog broadcast itself, + // avoiding a duplicate subscription on the same path. + if (resolved === base || resolved === Path.empty()) return effect.get(this.output.active); + + return effect.get(this.#override(resolved)); + } + + #overrides = new Map>(); + + #override(path: Moq.Path.Valid): Signal { + const cached = this.#overrides.get(path); + if (cached) return cached; + + const signal = new Signal(undefined); + this.#overrides.set(path, signal); + + this.signals.run((effect) => { + const enabled = effect.get(this.input.enabled); + if (!enabled) return; + + const conn = effect.get(this.input.connection); + if (!conn) return; + + if (!this.#isPathAnnounced(effect, path)) return; + + const broadcast = conn.consume(path); + effect.cleanup(() => broadcast.close()); + effect.set(signal, broadcast, undefined); + }); + + return signal; + } + close() { this.signals.close(); } diff --git a/js/watch/src/video/decoder.ts b/js/watch/src/video/decoder.ts index 88f6a46c0..93faadba1 100644 --- a/js/watch/src/video/decoder.ts +++ b/js/watch/src/video/decoder.ts @@ -98,7 +98,9 @@ export class Decoder implements Backend { } const [_, broadcast, track, config] = values; - const active: Moq.Broadcast | undefined = effect.get(broadcast.output.active); + // Honor a per-rendition `broadcast` override: subscribe on the resolved source + // broadcast instead of the catalog's own broadcast. + const active: Moq.Broadcast | undefined = broadcast.trackBroadcast(effect, config.broadcast); if (!active) { // Going offline should clear the last rendered frame. this.#active.set(undefined); diff --git a/js/watch/src/video/mse.ts b/js/watch/src/video/mse.ts index 136f0c02e..0101febd3 100644 --- a/js/watch/src/video/mse.ts +++ b/js/watch/src/video/mse.ts @@ -56,15 +56,17 @@ export class Mse implements Backend { const broadcast = effect.get(this.source.input.broadcast); if (!broadcast) return; - const active = effect.get(broadcast.output.active); - if (!active) return; - const track = effect.get(this.source.output.track); if (!track) return; const config = effect.get(this.source.output.config); if (!config) return; + // Honor a per-rendition `broadcast` override: subscribe on the resolved source + // broadcast instead of the catalog's own broadcast. + const active = broadcast.trackBroadcast(effect, config.broadcast); + if (!active) return; + const mime = `video/mp4; codecs="${config.codec}"`; const sourceBuffer = mediaSource.addSourceBuffer(mime); diff --git a/rs/hang/src/catalog/audio/mod.rs b/rs/hang/src/catalog/audio/mod.rs index e170da503..599c4b0a2 100644 --- a/rs/hang/src/catalog/audio/mod.rs +++ b/rs/hang/src/catalog/audio/mod.rs @@ -61,6 +61,12 @@ impl Audio { #[serde(rename_all = "camelCase")] #[non_exhaustive] pub struct AudioConfig { + /// Optional reference to another broadcast that publishes this track, expressed + /// relative to the broadcast that served this catalog (e.g. `../source`). If unset, + /// the track lives in the same broadcast as the catalog. + #[serde(default)] + pub broadcast: Option, + // The codec, see the registry for details: // https://w3c.github.io/webcodecs/codec_registry.html #[serde_as(as = "DisplayFromStr")] @@ -110,6 +116,7 @@ impl AudioConfig { /// since the type is `#[non_exhaustive]`. pub fn new(codec: impl Into, sample_rate: u32, channel_count: u32) -> Self { Self { + broadcast: None, codec: codec.into(), sample_rate, channel_count, diff --git a/rs/hang/src/catalog/root.rs b/rs/hang/src/catalog/root.rs index a5bb28ab7..1c0f5969c 100644 --- a/rs/hang/src/catalog/root.rs +++ b/rs/hang/src/catalog/root.rs @@ -197,6 +197,7 @@ mod test { video_renditions.insert( "video".to_string(), VideoConfig { + broadcast: None, codec: H264 { profile: 0x64, constraints: 0x00, @@ -221,6 +222,7 @@ mod test { audio_renditions.insert( "audio".to_string(), AudioConfig { + broadcast: None, codec: Opus, sample_rate: 48_000, channel_count: 2, @@ -250,6 +252,94 @@ mod test { assert_eq!(encoded, output, "encode mismatch"); } + #[test] + fn rendition_with_broadcast_override() { + // Decode a catalog where one rendition references a track in a sibling broadcast, + // and verify the `broadcast` field round-trips through serde. + let encoded = r#"{ + "video": { + "renditions": { + "video": { + "broadcast": "../source", + "codec": "avc1.64001f", + "codedWidth": 1280, + "codedHeight": 720, + "container": {"kind": "legacy"} + } + } + } + }"#; + + let parsed = Catalog::from_str(encoded).expect("failed to decode"); + let rendition = parsed.video.renditions.get("video").expect("missing rendition"); + assert_eq!( + rendition.broadcast.as_ref().map(|p| p.as_str()), + Some("../source"), + "broadcast field did not deserialize" + ); + + // Full encode -> decode -> equality, so the test catches any encoder regression + // (e.g. wrong key, double-emission, or `null` instead of skip). + let output = parsed.to_string().expect("failed to encode"); + let reparsed = Catalog::from_str(&output).expect("failed to re-decode"); + assert_eq!(parsed, reparsed, "re-encoded catalog did not round-trip"); + } + + #[test] + fn rendition_without_broadcast_omits_field() { + // `broadcast: None` must NOT serialize as `"broadcast":null`, otherwise the wire + // format silently changes for every catalog that doesn't use cross-broadcast refs. + let mut video_config = VideoConfig::new(H264 { + profile: 0x64, + constraints: 0x00, + level: 0x1f, + inline: false, + }); + video_config.container = Container::Legacy; + + let mut renditions = BTreeMap::new(); + renditions.insert("video".to_string(), video_config); + + let catalog = Catalog { + video: Video { + renditions, + ..Default::default() + }, + ..Default::default() + }; + + let output = catalog.to_string().expect("failed to encode"); + assert!( + !output.contains("broadcast"), + "broadcast field leaked into JSON when None: {output}" + ); + } + + #[test] + fn rendition_with_empty_broadcast_normalizes() { + // An empty-string broadcast field should normalize to an empty PathRelative so the + // consumer can treat it identically to a missing field. + let encoded = r#"{ + "video": { + "renditions": { + "video": { + "broadcast": "", + "codec": "avc1.64001f", + "container": {"kind": "legacy"} + } + } + } + }"#; + + let parsed = Catalog::from_str(encoded).expect("failed to decode"); + let rendition = parsed.video.renditions.get("video").expect("missing rendition"); + assert_eq!( + rendition.broadcast.as_ref().map(|p| p.is_empty()), + Some(true), + "empty broadcast should deserialize as Some(empty)" + ); + } + #[test] fn extension_roundtrip() { // An application extends the catalog with its own root section by flattening Catalog. diff --git a/rs/hang/src/catalog/video/mod.rs b/rs/hang/src/catalog/video/mod.rs index 0db581a25..28d0fb329 100644 --- a/rs/hang/src/catalog/video/mod.rs +++ b/rs/hang/src/catalog/video/mod.rs @@ -90,6 +90,15 @@ pub struct Display { #[serde(rename_all = "camelCase")] #[non_exhaustive] pub struct VideoConfig { + /// Optional reference to another broadcast that publishes this track, expressed + /// relative to the broadcast that served this catalog (e.g. `../source`). If unset, + /// the track lives in the same broadcast as the catalog. + /// + /// This allows a transcoder to author a downstream catalog that points unchanged + /// renditions at the source broadcast without re-publishing the bytes. + #[serde(default)] + pub broadcast: Option, + /// The codec, see the registry for details: /// #[serde_as(as = "DisplayFromStr")] @@ -161,6 +170,7 @@ impl VideoConfig { /// since the type is `#[non_exhaustive]`. pub fn new(codec: impl Into) -> Self { Self { + broadcast: None, codec: codec.into(), description: None, coded_width: None, diff --git a/rs/moq-cli/src/main.rs b/rs/moq-cli/src/main.rs index c372f7328..8333b6466 100644 --- a/rs/moq-cli/src/main.rs +++ b/rs/moq-cli/src/main.rs @@ -307,12 +307,16 @@ async fn run_announced_subscribe( ) -> anyhow::Result<()> { let catalog = args.catalog_format(&broadcast); - let consumer = consumer + let announced = consumer .announced_broadcast(&broadcast) .await .ok_or_else(|| anyhow::anyhow!("origin closed before broadcast was announced"))?; - Subscribe::new(consumer, catalog, args).run().await + // Keep the origin around so catalogs referencing sibling broadcasts (a rendition's + // `broadcast` field, e.g. "../source") can be resolved. + let source = moq_mux::Source::new(announced).with_origin(consumer, &broadcast); + + Subscribe::new(source, catalog, args).run().await } async fn run_hls_export( diff --git a/rs/moq-cli/src/subscribe.rs b/rs/moq-cli/src/subscribe.rs index be494eeaa..6ea6b1527 100644 --- a/rs/moq-cli/src/subscribe.rs +++ b/rs/moq-cli/src/subscribe.rs @@ -2,7 +2,6 @@ use std::time::Duration; use clap::ValueEnum; use hang::catalog::{AudioCodecKind, VideoCodecKind}; -use hang::moq_net; use moq_mux::catalog::{self, CatalogFormat, FilterAudio, FilterVideo, Stream, TargetAudio, TargetVideo}; use tokio::io::AsyncWriteExt; @@ -214,15 +213,15 @@ impl SubscribeArgs { } pub struct Subscribe { - broadcast: moq_net::BroadcastConsumer, + source: moq_mux::Source, catalog: CatalogFormat, args: SubscribeArgs, } impl Subscribe { - pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: CatalogFormat, args: SubscribeArgs) -> Self { + pub fn new(source: impl Into, catalog: CatalogFormat, args: SubscribeArgs) -> Self { Self { - broadcast, + source: source.into(), catalog, args, } @@ -230,7 +229,7 @@ impl Subscribe { /// Build the catalog stream from the configured filter/target flags. async fn stream(&self) -> anyhow::Result>> { - let consumer = catalog::Consumer::new(&self.broadcast, self.catalog).await?; + let consumer = catalog::Consumer::new(self.source.broadcast(), self.catalog).await?; let mut filter = consumer.filter(); filter.set_video(self.args.filter_video()?); @@ -258,7 +257,7 @@ impl Subscribe { let mut stdout = tokio::io::stdout(); let stream = self.stream().await?; - let mut fmp4 = moq_mux::container::fmp4::Export::new(self.broadcast.clone(), stream) + let mut fmp4 = moq_mux::container::fmp4::Export::new(self.source.clone(), stream) .with_latency(self.args.max_latency) .with_fragment_duration(self.args.fragment_duration); @@ -274,7 +273,7 @@ impl Subscribe { let mut stdout = tokio::io::stdout(); let stream = self.stream().await?; - let mut mkv = moq_mux::container::mkv::Export::new(self.broadcast.clone(), stream) + let mut mkv = moq_mux::container::mkv::Export::new(self.source.clone(), stream) .with_latency(self.args.max_latency) .with_fragment_duration(self.args.fragment_duration); @@ -291,7 +290,7 @@ impl Subscribe { let stream = self.stream().await?; let mut h264 = - moq_mux::codec::h264::Export::new(self.broadcast.clone(), stream).with_latency(self.args.max_latency); + moq_mux::codec::h264::Export::new(self.source.clone(), stream).with_latency(self.args.max_latency); while let Some(chunk) = h264.next().await? { stdout.write_all(&chunk).await?; @@ -306,7 +305,7 @@ impl Subscribe { let stream = self.stream().await?; let mut h265 = - moq_mux::codec::h265::Export::new(self.broadcast.clone(), stream).with_latency(self.args.max_latency); + moq_mux::codec::h265::Export::new(self.source.clone(), stream).with_latency(self.args.max_latency); while let Some(chunk) = h265.next().await? { stdout.write_all(&chunk).await?; @@ -324,7 +323,7 @@ impl Subscribe { // is re-framed as ADTS. `fragment_duration` does not apply to TS. `with_ts` // selects the `mpegts` catalog extension so undecoded elementary streams // (SCTE-35, teletext, DVB AC-3, ...) are re-emitted verbatim on their PIDs. - let mut ts = moq_mux::container::ts::Export::with_ts(self.broadcast, self.catalog) + let mut ts = moq_mux::container::ts::Export::with_ts(self.source, self.catalog) .await? .with_latency(self.args.max_latency); @@ -343,7 +342,7 @@ impl Subscribe { // frame interleaved by timestamp. Avc3 sources are transcoded to avc1 shape // internally (synthesizing avcC from inline parameter sets). Only H.264 video // and AAC audio are supported; `fragment_duration` does not apply to FLV. - let mut flv = moq_mux::container::flv::Export::with_catalog_format(self.broadcast, self.catalog) + let mut flv = moq_mux::container::flv::Export::with_catalog_format(self.source, self.catalog) .await? .with_latency(self.args.max_latency); diff --git a/rs/moq-hls/src/export/mod.rs b/rs/moq-hls/src/export/mod.rs index 6db5ed579..0a0ff8064 100644 --- a/rs/moq-hls/src/export/mod.rs +++ b/rs/moq-hls/src/export/mod.rs @@ -61,8 +61,8 @@ pub struct Broadcaster { } impl Broadcaster { - /// Subscribe to `broadcast` and start tracking its renditions. - pub fn new(broadcast: moq_net::BroadcastConsumer, config: Config) -> Arc { + /// Subscribe to `source` and start tracking its renditions. + pub fn new(source: impl Into, config: Config) -> Arc { let (ready, _) = watch::channel(0); let (paused, _) = watch::channel(false); let broadcaster = Arc::new(Self { @@ -70,7 +70,7 @@ impl Broadcaster { ready, paused, }); - tokio::spawn(watch_catalog(broadcast, config, broadcaster.clone())); + tokio::spawn(watch_catalog(source.into(), config, broadcaster.clone())); broadcaster } @@ -140,14 +140,14 @@ impl Broadcaster { /// Add renditions newly present in `catalog`. Renditions are not removed when /// they disappear; their stores simply go stale (rare for a live broadcast). - fn sync(&self, broadcast: &moq_net::BroadcastConsumer, config: &Config, catalog: &Catalog) { + fn sync(&self, source: &moq_mux::Source, config: &Config, catalog: &Catalog) { let mut renditions = self.renditions.lock().unwrap(); for (name, video) in &catalog.video.renditions { renditions.entry(name.clone()).or_insert_with(|| { Arc::new(Rendition::video( name.clone(), video, - broadcast.clone(), + source.clone(), config, self.paused.subscribe(), )) @@ -158,7 +158,7 @@ impl Broadcaster { Arc::new(Rendition::audio( name.clone(), audio, - broadcast.clone(), + source.clone(), config, self.paused.subscribe(), )) @@ -168,8 +168,8 @@ impl Broadcaster { } } -async fn watch_catalog(broadcast: moq_net::BroadcastConsumer, config: Config, broadcaster: Arc) { - let mut consumer = match catalog::Consumer::<()>::new(&broadcast, CatalogFormat::Hang).await { +async fn watch_catalog(source: moq_mux::Source, config: Config, broadcaster: Arc) { + let mut consumer = match catalog::Consumer::<()>::new(source.broadcast(), CatalogFormat::Hang).await { Ok(consumer) => consumer, Err(err) => { tracing::warn!(%err, "failed to subscribe to broadcast catalog"); @@ -179,7 +179,7 @@ async fn watch_catalog(broadcast: moq_net::BroadcastConsumer, config: Config, br loop { match kio::wait(|waiter| consumer.poll_next(waiter)).await { - Ok(Some(catalog)) => broadcaster.sync(&broadcast, &config, &catalog), + Ok(Some(catalog)) => broadcaster.sync(&source, &config, &catalog), Ok(None) => break, Err(err) => { tracing::warn!(%err, "broadcast catalog stream ended with error"); diff --git a/rs/moq-hls/src/export/rendition.rs b/rs/moq-hls/src/export/rendition.rs index 8a6b96a22..cf276edca 100644 --- a/rs/moq-hls/src/export/rendition.rs +++ b/rs/moq-hls/src/export/rendition.rs @@ -38,7 +38,7 @@ impl Rendition { pub fn video( name: String, config: &VideoConfig, - broadcast: moq_net::BroadcastConsumer, + source: moq_mux::Source, cfg: &Config, paused: watch::Receiver, ) -> Self { @@ -48,7 +48,7 @@ impl Rendition { cfg.audio_segment_target.as_secs_f64(), cfg.window.as_secs_f64(), )); - spawn_pump(broadcast, name.clone(), Kind::Video, store.clone(), cfg.clone(), paused); + spawn_pump(source, name.clone(), Kind::Video, store.clone(), cfg.clone(), paused); Self { name, kind: Kind::Video, @@ -63,7 +63,7 @@ impl Rendition { pub fn audio( name: String, config: &AudioConfig, - broadcast: moq_net::BroadcastConsumer, + source: moq_mux::Source, cfg: &Config, paused: watch::Receiver, ) -> Self { @@ -73,7 +73,7 @@ impl Rendition { cfg.audio_segment_target.as_secs_f64(), cfg.window.as_secs_f64(), )); - spawn_pump(broadcast, name.clone(), Kind::Audio, store.clone(), cfg.clone(), paused); + spawn_pump(source, name.clone(), Kind::Audio, store.clone(), cfg.clone(), paused); Self { name, kind: Kind::Audio, @@ -87,7 +87,7 @@ impl Rendition { } fn spawn_pump( - broadcast: moq_net::BroadcastConsumer, + source: moq_mux::Source, name: String, kind: Kind, store: Arc, @@ -95,7 +95,7 @@ fn spawn_pump( paused: watch::Receiver, ) { tokio::spawn(async move { - if let Err(err) = run_pump(broadcast, &name, kind, &store, &cfg, paused).await { + if let Err(err) = run_pump(source, &name, kind, &store, &cfg, paused).await { tracing::warn!(%name, ?kind, %err, "hls rendition pump ended with error"); } // Whatever happened, mark the playlist closed so blocking readers wake. @@ -104,14 +104,14 @@ fn spawn_pump( } async fn run_pump( - broadcast: moq_net::BroadcastConsumer, + source: moq_mux::Source, name: &str, kind: Kind, store: &SegmentStore, cfg: &Config, mut paused: watch::Receiver, ) -> Result<()> { - let consumer = catalog::Consumer::<()>::new(&broadcast, CatalogFormat::Hang).await?; + let consumer = catalog::Consumer::<()>::new(source.broadcast(), CatalogFormat::Hang).await?; let mut filter = Filter::new(consumer); // Narrow *both* axes to this rendition's name so the exporter sees exactly one @@ -128,9 +128,9 @@ async fn run_pump( // A handle for noticing the broadcast close even while paused; the `Export` // below takes its own clone for pulling fragments. - let closed = broadcast.clone(); + let closed = source.broadcast().clone(); - let mut export = Export::new(broadcast, filter) + let mut export = Export::new(source, filter) .with_fragment_duration(cfg.part_target) .with_latency(cfg.latency); diff --git a/rs/moq-hls/src/server/mod.rs b/rs/moq-hls/src/server/mod.rs index 55cf68c3b..2fed717bf 100644 --- a/rs/moq-hls/src/server/mod.rs +++ b/rs/moq-hls/src/server/mod.rs @@ -65,10 +65,14 @@ impl Server { .ok() .flatten()?; + // Keep the origin attached so renditions referencing a sibling broadcast + // (the catalog `broadcast` field) can be resolved. + let source = moq_mux::Source::new(broadcast).with_origin(self.inner.origin.consume(), name); + let mut broadcasters = self.inner.broadcasters.lock().unwrap(); let broadcaster = broadcasters .entry(name.to_string()) - .or_insert_with(|| Broadcaster::new(broadcast, self.inner.config.clone())); + .or_insert_with(|| Broadcaster::new(source, self.inner.config.clone())); Some(broadcaster.clone()) } } diff --git a/rs/moq-mux/src/codec/h264/export.rs b/rs/moq-mux/src/codec/h264/export.rs index 0eead7846..234e2ff95 100644 --- a/rs/moq-mux/src/codec/h264/export.rs +++ b/rs/moq-mux/src/codec/h264/export.rs @@ -25,7 +25,7 @@ use crate::container::ExportSource; /// Single-rendition H.264 Annex-B exporter. pub struct Export { - broadcast: moq_net::BroadcastConsumer, + source: crate::Source, catalog: Option, latency: Duration, track: Option, @@ -51,16 +51,16 @@ struct Avc1Convert { } impl Export { - /// Subscribe to `broadcast` and emit an Annex-B H.264 byte stream. + /// Subscribe to `source` and emit an Annex-B H.264 byte stream. /// /// `catalog` is expected to be narrowed to a single H.264 rendition (e.g. /// `consumer.filter()` with `codec = H264` then `.target()` for ABR /// selection). Renditions of other codecs are ignored; if multiple H.264 /// renditions appear in a snapshot, the first by BTreeMap order wins and /// a warning is logged. - pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + pub fn new(source: impl Into, catalog: S) -> Self { Self { - broadcast, + source: source.into(), catalog: Some(catalog), latency: Duration::ZERO, track: None, @@ -150,7 +150,7 @@ impl Export { return Ok(()); } - let source = ExportSource::for_video_raw(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_video_raw(&self.source, name, config, self.latency)?; let convert = match config.description.as_ref().filter(|d| !d.is_empty()) { None => None, Some(avcc) => { diff --git a/rs/moq-mux/src/codec/h265/export.rs b/rs/moq-mux/src/codec/h265/export.rs index b65d9dda1..7b2930c49 100644 --- a/rs/moq-mux/src/codec/h265/export.rs +++ b/rs/moq-mux/src/codec/h265/export.rs @@ -18,7 +18,7 @@ use crate::container::ExportSource; /// Single-rendition H.265 Annex-B exporter. pub struct Export { - broadcast: moq_net::BroadcastConsumer, + source: crate::Source, catalog: Option, latency: Duration, track: Option, @@ -44,14 +44,14 @@ struct Hvc1Convert { } impl Export { - /// Subscribe to `broadcast` and emit an Annex-B H.265 byte stream. + /// Subscribe to `source` and emit an Annex-B H.265 byte stream. /// /// `catalog` is expected to be narrowed to a single H.265 rendition. If /// multiple H.265 renditions appear in a snapshot, the first by BTreeMap /// order wins and a warning is logged. - pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + pub fn new(source: impl Into, catalog: S) -> Self { Self { - broadcast, + source: source.into(), catalog: Some(catalog), latency: Duration::ZERO, track: None, @@ -141,7 +141,7 @@ impl Export { return Ok(()); } - let source = ExportSource::for_video_raw(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_video_raw(&self.source, name, config, self.latency)?; let convert = match config.description.as_ref().filter(|d| !d.is_empty()) { None => None, Some(hvcc) => { diff --git a/rs/moq-mux/src/container/flv/export.rs b/rs/moq-mux/src/container/flv/export.rs index 6af0a5f9d..b9da95267 100644 --- a/rs/moq-mux/src/container/flv/export.rs +++ b/rs/moq-mux/src/container/flv/export.rs @@ -72,7 +72,7 @@ impl Flavor { /// keyframe). Only Legacy and LOC container tracks (raw codec payloads) are /// supported; CMAF tracks are rejected. pub struct Export { - broadcast: moq_net::BroadcastConsumer, + source: crate::Source, catalog: Option, latency: Duration, @@ -99,21 +99,22 @@ struct FlvTrack { } impl Export { - /// Subscribe to `broadcast` and produce FLV byte chunks, using the default + /// Subscribe to `source` and produce FLV byte chunks, using the default /// catalog format ([`CatalogFormat::Hang`]). - pub async fn new(broadcast: moq_net::BroadcastConsumer) -> Result { - Self::with_catalog_format(broadcast, CatalogFormat::default()).await + pub async fn new(source: impl Into) -> Result { + Self::with_catalog_format(source, CatalogFormat::default()).await } - /// Subscribe to `broadcast` and produce FLV byte chunks, selecting an explicit + /// Subscribe to `source` and produce FLV byte chunks, selecting an explicit /// `catalog_format` for track discovery. pub async fn with_catalog_format( - broadcast: moq_net::BroadcastConsumer, + source: impl Into, catalog_format: CatalogFormat, ) -> Result { - let catalog = crate::catalog::Consumer::new(&broadcast, catalog_format).await?; + let source = source.into(); + let catalog = crate::catalog::Consumer::new(source.broadcast(), catalog_format).await?; Ok(Self { - broadcast, + source, catalog: Some(catalog), latency: Duration::ZERO, video: None, @@ -237,7 +238,7 @@ impl Export { (VideoCodec::AV1(av1), None) => Some(Bytes::copy_from_slice(&av1c_bytes(av1))), _ => None, }; - let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_video(&self.source, name, config, self.latency)?; self.video = Some(FlvTrack { name: name.clone(), source, @@ -256,7 +257,7 @@ impl Export { { let flavor = audio_flavor(config)?; ensure_legacy(&config.container, "audio", name)?; - let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_audio(&self.source, name, config, self.latency)?; self.audio = Some(FlvTrack { name: name.clone(), source, diff --git a/rs/moq-mux/src/container/fmp4/export.rs b/rs/moq-mux/src/container/fmp4/export.rs index ce4981ace..773b17717 100644 --- a/rs/moq-mux/src/container/fmp4/export.rs +++ b/rs/moq-mux/src/container/fmp4/export.rs @@ -15,7 +15,8 @@ use moq_net::Timestamp; /// Subscribe to a moq broadcast and produce a single fMP4 / CMAF byte stream. /// -/// Built from a [`moq_net::BroadcastConsumer`], `Export` subscribes to the hang catalog, +/// Built from a [`Source`](crate::Source) (or a bare [`moq_net::BroadcastConsumer`] via +/// `Into`), `Export` subscribes to the hang catalog, /// (un)subscribes per-rendition tracks as the catalog changes, decodes both Legacy and /// CMAF tracks via a per-track source, and re-encodes everything as a merged init /// segment + moof+mdat fragments in presentation-timestamp order across tracks. This @@ -35,7 +36,7 @@ use moq_net::Timestamp; /// onto segments and parts; narrow the catalog to a single rendition with /// [`catalog::Filter`](crate::catalog::Filter) so the fragments belong to one track. pub struct Export { - broadcast: moq_net::BroadcastConsumer, + source: crate::Source, catalog: Option, latency: Duration, fragment_duration: Option, @@ -100,16 +101,16 @@ struct Fmp4Track { } impl Export { - /// Subscribe to `broadcast` and produce fMP4 byte chunks, driving track + /// Subscribe to `source` and produce fMP4 byte chunks, driving track /// (un)subscription from `catalog`. /// /// `catalog` is any [`Stream`] of catalog snapshots, typically a /// [`catalog::Consumer`](crate::catalog::Consumer) directly, or wrapped in /// [`catalog::Filter`](crate::catalog::Filter) / /// [`catalog::Target`](crate::catalog::Target) to narrow the rendition set. - pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + pub fn new(source: impl Into, catalog: S) -> Self { Self { - broadcast, + source: source.into(), catalog: Some(catalog), latency: Duration::ZERO, fragment_duration: None, @@ -315,7 +316,7 @@ impl Export { if self.tracks.contains_key(name) { continue; } - let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_video(&self.source, name, config, self.latency)?; let timescale = catalog_timescale_video(config); self.tracks.insert( name.clone(), @@ -339,7 +340,7 @@ impl Export { if self.tracks.contains_key(name) { continue; } - let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_audio(&self.source, name, config, self.latency)?; let timescale = catalog_timescale_audio(config); self.tracks.insert( name.clone(), diff --git a/rs/moq-mux/src/container/mkv/export.rs b/rs/moq-mux/src/container/mkv/export.rs index da28ee9cd..69acdbf24 100644 --- a/rs/moq-mux/src/container/mkv/export.rs +++ b/rs/moq-mux/src/container/mkv/export.rs @@ -19,7 +19,8 @@ const TIMESTAMP_SCALE_NS: u64 = 1_000_000; /// Subscribe to a moq broadcast and produce a single Matroska / WebM byte stream. /// -/// Built from a [`moq_net::BroadcastConsumer`], `Export` subscribes to the hang catalog, +/// Built from a [`Source`](crate::Source) (or a bare [`moq_net::BroadcastConsumer`] via +/// `Into`), `Export` subscribes to the hang catalog, /// (un)subscribes per-rendition tracks, decodes them via a per-track source, and /// re-encodes everything as EBML + Segment + Tracks + Cluster/SimpleBlock tags ready /// for any Matroska-aware consumer (ffplay, libwebm, browser MSE for WebM). @@ -45,7 +46,7 @@ const TIMESTAMP_SCALE_NS: u64 = 1_000_000; /// Only Legacy-container tracks (raw codec payloads) are supported. CMAF tracks /// (moof+mdat passthrough) are rejected with a clear error. pub struct Export { - broadcast: moq_net::BroadcastConsumer, + source: crate::Source, catalog: Option, latency: Duration, fragment_duration: Option, @@ -152,15 +153,15 @@ impl ClusterBuilder { } impl Export { - /// Subscribe to `broadcast` and produce MKV byte chunks, driving track + /// Subscribe to `source` and produce MKV byte chunks, driving track /// (un)subscription from `catalog`. /// /// `catalog` is any [`Stream`] of catalog snapshots, typically a /// [`catalog::Consumer`](crate::catalog::Consumer) directly, or wrapped in /// [`catalog::Filter`](crate::catalog::Filter) to narrow the rendition set. - pub fn new(broadcast: moq_net::BroadcastConsumer, catalog: S) -> Self { + pub fn new(source: impl Into, catalog: S) -> Self { Self { - broadcast, + source: source.into(), catalog: Some(catalog), latency: Duration::ZERO, fragment_duration: None, @@ -329,7 +330,7 @@ impl Export { continue; } ensure_legacy(&config.container, "video", name)?; - let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_video(&self.source, name, config, self.latency)?; self.tracks.insert( name.clone(), MkvTrack { @@ -348,7 +349,7 @@ impl Export { continue; } ensure_legacy(&config.container, "audio", name)?; - let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_audio(&self.source, name, config, self.latency)?; self.tracks.insert( name.clone(), MkvTrack { diff --git a/rs/moq-mux/src/container/source.rs b/rs/moq-mux/src/container/source.rs index d2ac7fe45..ce2163ad8 100644 --- a/rs/moq-mux/src/container/source.rs +++ b/rs/moq-mux/src/container/source.rs @@ -48,6 +48,9 @@ impl VideoTransform { /// A subscription that resolves on first poll, then the live consumer. enum SourceState { + /// Waiting for a cross-broadcast reference to resolve into a broadcast; the + /// track (by name) is subscribed once it does. + Requesting(kio::Pending, String), /// Waiting for the subscription to resolve (blocks on the publisher's SUBSCRIBE_OK). Subscribing(kio::Pending), /// The resolved consumer, reading frames. Boxed because it's much larger than @@ -55,6 +58,15 @@ enum SourceState { Active(Box>), } +impl From for SourceState { + fn from(subscribe: crate::source::Subscribe) -> Self { + match subscribe { + crate::source::Subscribe::Track(pending) => Self::Subscribing(pending), + crate::source::Subscribe::Broadcast(pending, name) => Self::Requesting(pending, name), + } + } +} + /// A per-rendition source that normalizes frame shape (Annex-B → /// length-prefixed for H.264/H.265) and exposes the resolved codec config /// record alongside the frame stream. @@ -73,7 +85,7 @@ pub(crate) struct ExportSource { impl ExportSource { /// Subscribe to a video rendition and build an `ExportSource`. pub fn for_video( - broadcast: &moq_net::BroadcastConsumer, + source: &crate::Source, name: &str, config: &VideoConfig, latency: Duration, @@ -83,7 +95,7 @@ impl ExportSource { let description = config.description.as_ref().filter(|b| !b.is_empty()).cloned(); Ok(Self { - state: SourceState::Subscribing(broadcast.track(name)?.subscribe(None)), + state: source.subscribe(config.broadcast.as_ref(), name)?.into(), media: Some(media), latency, transform, @@ -96,7 +108,7 @@ impl ExportSource { /// avc1 length-prefixed stays length-prefixed). The Annex-B exporter /// uses this to keep parameter sets in-band. pub fn for_video_raw( - broadcast: &moq_net::BroadcastConsumer, + source: &crate::Source, name: &str, config: &VideoConfig, latency: Duration, @@ -105,7 +117,7 @@ impl ExportSource { let description = config.description.as_ref().filter(|b| !b.is_empty()).cloned(); Ok(Self { - state: SourceState::Subscribing(broadcast.track(name)?.subscribe(None)), + state: source.subscribe(config.broadcast.as_ref(), name)?.into(), media: Some(media), latency, transform: None, @@ -116,7 +128,7 @@ impl ExportSource { /// Subscribe to an audio rendition. Audio has no codec-shape transform; /// `description` is taken straight from the catalog. pub fn for_audio( - broadcast: &moq_net::BroadcastConsumer, + source: &crate::Source, name: &str, config: &AudioConfig, latency: Duration, @@ -125,7 +137,7 @@ impl ExportSource { let description = config.description.as_ref().filter(|b| !b.is_empty()).cloned(); Ok(Self { - state: SourceState::Subscribing(broadcast.track(name)?.subscribe(None)), + state: source.subscribe(config.broadcast.as_ref(), name)?.into(), media: Some(media), latency, transform: None, @@ -136,13 +148,9 @@ impl ExportSource { /// Subscribe to a verbatim `mpegts` stream rendition (SCTE-35, private PES, ...). /// No codec-shape transform and no description: the frames are Legacy-framed /// verbatim bytes the muxer writes back out as PES or private sections. - pub fn for_stream( - broadcast: &moq_net::BroadcastConsumer, - name: &str, - latency: Duration, - ) -> Result { + pub fn for_stream(source: &crate::Source, name: &str, latency: Duration) -> Result { Ok(Self { - state: SourceState::Subscribing(broadcast.track(name)?.subscribe(None)), + state: source.subscribe(None, name)?.into(), media: Some(HangContainer::Legacy), latency, transform: None, @@ -167,6 +175,21 @@ impl ExportSource { /// absorbed and the next frame is polled. Returns `Ready(None)` at /// end-of-track. pub fn poll_read(&mut self, waiter: &kio::Waiter) -> Poll>> { + // Resolve a cross-broadcast reference into a broadcast before subscribing. + if matches!(self.state, SourceState::Requesting(..)) { + let (broadcast, name) = { + let SourceState::Requesting(pending, name) = &self.state else { + unreachable!("just matched Requesting"); + }; + match pending.poll_ok(waiter) { + Poll::Ready(Ok(broadcast)) => (broadcast, name.clone()), + Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), + Poll::Pending => return Poll::Pending, + } + }; + self.state = SourceState::Subscribing(broadcast.track(&name)?.subscribe(None)); + } + // Resolve the subscription before reading any frames. if matches!(self.state, SourceState::Subscribing(_)) { // Scope the `pending` borrow so it ends before we touch `self.media`/`self.state`. diff --git a/rs/moq-mux/src/container/ts/export.rs b/rs/moq-mux/src/container/ts/export.rs index 08119c3bc..d7d59aaac 100644 --- a/rs/moq-mux/src/container/ts/export.rs +++ b/rs/moq-mux/src/container/ts/export.rs @@ -54,7 +54,7 @@ const PSI_INTERVAL: Duration = Duration::from_millis(500); /// timestamp), and is re-emitted at video keyframes and periodically for /// mid-stream tune-in. Returns `None` when the broadcast ends. pub struct Export { - broadcast: moq_net::BroadcastConsumer, + source: crate::Source, catalog: Option>, latency: Duration, @@ -150,41 +150,41 @@ struct PesUnit { } impl Export { - /// Subscribe to `broadcast`, using the default catalog format. - pub async fn new(broadcast: moq_net::BroadcastConsumer) -> Result { - Self::with_catalog_format(broadcast, CatalogFormat::default()).await + /// Subscribe to `source`, using the default catalog format. + pub async fn new(source: impl Into) -> Result { + Self::with_catalog_format(source, CatalogFormat::default()).await } - /// Subscribe to `broadcast`, selecting an explicit catalog format. Media only; + /// Subscribe to `source`, selecting an explicit catalog format. Media only; /// any catalog extension (e.g. the `mpegts` verbatim streams) is ignored. pub async fn with_catalog_format( - broadcast: moq_net::BroadcastConsumer, + source: impl Into, catalog_format: CatalogFormat, ) -> Result { - Self::build(broadcast, catalog_format).await + Self::build(source.into(), catalog_format).await } } impl Export { - /// Subscribe to `broadcast`, exporting its `mpegts` verbatim streams (SCTE-35, + /// Subscribe to `source`, exporting its `mpegts` verbatim streams (SCTE-35, /// private data, ...) back to MPEG-TS alongside the media. The `Self` type pins /// the extension, so callers write `Export::with_ts(..)` with no turbofish (the /// plain constructors are media-only). pub async fn with_ts( - broadcast: moq_net::BroadcastConsumer, + source: impl Into, catalog_format: CatalogFormat, ) -> Result { - Self::build(broadcast, catalog_format).await + Self::build(source.into(), catalog_format).await } } impl Export { /// Shared constructor. The public entry points each live on a concrete /// `Export` impl that pins `E`, so the extension is chosen by which one you call. - async fn build(broadcast: moq_net::BroadcastConsumer, catalog_format: CatalogFormat) -> Result { - let catalog = crate::catalog::Consumer::::new(&broadcast, catalog_format).await?; + async fn build(source: crate::Source, catalog_format: CatalogFormat) -> Result { + let catalog = crate::catalog::Consumer::::new(source.broadcast(), catalog_format).await?; Ok(Self { - broadcast, + source, catalog: Some(catalog), latency: Duration::ZERO, tracks: HashMap::new(), @@ -406,7 +406,7 @@ impl Export { self.tracks.insert(name.clone(), track); } None => { - let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_video(&self.source, name, config, self.latency)?; self.insert_track(name, source, pid, kind, descriptors, reserve); } } @@ -423,7 +423,7 @@ impl Export { self.tracks.insert(name.clone(), track); } None => { - let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?; + let source = ExportSource::for_audio(&self.source, name, config, self.latency)?; self.insert_track(name, source, pid, kind, descriptors, DEFAULT_DTS_RESERVE); } } @@ -447,7 +447,7 @@ impl Export { self.tracks.insert(name.clone(), existing); } None => { - let source = ExportSource::for_stream(&self.broadcast, name, self.latency)?; + let source = ExportSource::for_stream(&self.source, name, self.latency)?; self.insert_track(name, source, pid, kind, descriptors, DEFAULT_DTS_RESERVE); } } diff --git a/rs/moq-mux/src/error.rs b/rs/moq-mux/src/error.rs index a6e66f0d6..2804da5ba 100644 --- a/rs/moq-mux/src/error.rs +++ b/rs/moq-mux/src/error.rs @@ -105,6 +105,11 @@ pub enum Error { /// reserved media section (`video`/`audio`). #[error("reserved catalog section: {0}")] ReservedSection(String), + + /// The catalog references a track in another broadcast, but the export was built + /// without origin context to resolve it. See [`Source::with_origin`](crate::Source::with_origin). + #[error("catalog references another broadcast: {0}")] + MissingOrigin(moq_net::PathRelativeOwned), } impl From for Error { diff --git a/rs/moq-mux/src/lib.rs b/rs/moq-mux/src/lib.rs index 6d1242ab5..4c1bd978f 100644 --- a/rs/moq-mux/src/lib.rs +++ b/rs/moq-mux/src/lib.rs @@ -21,6 +21,8 @@ pub mod codec; pub mod container; mod error; pub mod import; +mod source; pub use clock::Clock; pub use error::*; +pub use source::Source; diff --git a/rs/moq-mux/src/source.rs b/rs/moq-mux/src/source.rs new file mode 100644 index 000000000..882a01198 --- /dev/null +++ b/rs/moq-mux/src/source.rs @@ -0,0 +1,173 @@ +//! Export input: the catalog broadcast plus optional origin context. +//! +//! A hang catalog rendition may reference a track published in *another* +//! broadcast via its `broadcast` field (a path relative to the catalog's +//! broadcast, e.g. `../source`). Resolving that reference requires more than a +//! [`moq_net::BroadcastConsumer`]: it needs the catalog broadcast's own path and +//! an [`moq_net::OriginConsumer`] to fetch the referenced broadcast from. +//! [`Source`] bundles the three so exporters can subscribe to any rendition. + +use moq_net::AsPath; + +/// The subscription side of an export: the broadcast whose catalog drives it, +/// plus optional origin context for resolving cross-broadcast rendition references. +/// +/// Build one from a bare [`moq_net::BroadcastConsumer`] (via `From` or [`Source::new`]) +/// when every track lives in the catalog's own broadcast. Add origin context with +/// [`Source::with_origin`] to also serve catalogs whose renditions reference sibling +/// broadcasts; without it, such a rendition fails with [`Error::MissingOrigin`](crate::Error::MissingOrigin). +#[derive(Clone)] +pub struct Source { + broadcast: moq_net::BroadcastConsumer, + origin: Option<(moq_net::OriginConsumer, moq_net::PathOwned)>, +} + +impl Source { + /// A source without origin context: every track must live in the catalog's broadcast. + pub fn new(broadcast: moq_net::BroadcastConsumer) -> Self { + Self { + broadcast, + origin: None, + } + } + + /// Attach the origin the catalog broadcast came from and the path it lives at, + /// enabling renditions that reference another broadcast (e.g. `../source`). + /// + /// The relative reference is resolved against `path` and fetched via + /// [`moq_net::OriginConsumer::request_broadcast`], so the referenced broadcast must + /// be reachable through `origin` (announced, or served by a dynamic handler). + pub fn with_origin(mut self, origin: moq_net::OriginConsumer, path: impl AsPath) -> Self { + self.origin = Some((origin, path.as_path().to_owned())); + self + } + + /// The broadcast whose catalog drives the export. + pub fn broadcast(&self) -> &moq_net::BroadcastConsumer { + &self.broadcast + } + + /// Start subscribing to `name`, honoring an optional cross-broadcast reference. + /// + /// A missing/empty `rel`, or one that resolves back to the catalog's own path (or + /// to the origin root), subscribes on the catalog broadcast directly. Anything else + /// requests the resolved broadcast from the origin first. + pub(crate) fn subscribe(&self, rel: Option<&moq_net::PathRelative<'_>>, name: &str) -> crate::Result { + if let Some(rel) = rel.filter(|rel| !rel.is_empty()) { + let Some((origin, base)) = &self.origin else { + return Err(crate::Error::MissingOrigin(rel.to_owned())); + }; + + let resolved = base.resolve(rel); + + // A reference that walks back to the catalog's own broadcast is served by + // the catalog broadcast itself, avoiding a redundant subscription. Excess + // `..` resolving to the (empty) origin root is not a broadcast; treat it + // the same way rather than requesting an unrouteable path. + if !resolved.is_empty() && resolved != *base { + return Ok(Subscribe::Broadcast( + origin.request_broadcast(&resolved), + name.to_string(), + )); + } + } + + Ok(Subscribe::Track(self.broadcast.track(name)?.subscribe(None))) + } +} + +impl From for Source { + fn from(broadcast: moq_net::BroadcastConsumer) -> Self { + Self::new(broadcast) + } +} + +/// A pending rendition subscription, either direct or via a referenced broadcast. +pub(crate) enum Subscribe { + /// Subscribing on the catalog broadcast. + Track(kio::Pending), + /// Waiting for the referenced broadcast; the track (by name) is subscribed once it resolves. + Broadcast(kio::Pending, String), +} + +#[cfg(test)] +mod tests { + use super::*; + use moq_net::{BroadcastInfo, Origin, PathRelative}; + + fn broadcast() -> moq_net::BroadcastProducer { + BroadcastInfo::new().produce() + } + + #[test] + fn no_override_subscribes_catalog_broadcast() { + let producer = broadcast(); + // Keep a dynamic handle alive so track requests pend instead of NotFound. + let _dynamic = producer.dynamic(); + let source = Source::new(producer.consume()); + + assert!(matches!(source.subscribe(None, "video").unwrap(), Subscribe::Track(_))); + // An empty rel is the same as no rel. + let empty = PathRelative::empty(); + assert!(matches!( + source.subscribe(Some(&empty), "video").unwrap(), + Subscribe::Track(_) + )); + } + + #[test] + fn override_without_origin_fails() { + let producer = broadcast(); + let source = Source::new(producer.consume()); + + let rel = PathRelative::new("../other"); + assert!(matches!( + source.subscribe(Some(&rel), "video"), + Err(crate::Error::MissingOrigin(_)) + )); + } + + #[test] + fn self_reference_subscribes_catalog_broadcast() { + let origin = Origin::random().produce(); + let producer = broadcast(); + let _dynamic = producer.dynamic(); + let _publish = origin.publish_broadcast("a/pub", &producer).unwrap(); + + let source = Source::new(producer.consume()).with_origin(origin.consume(), "a/pub"); + + // Walks back to the catalog's own path. + let rel = PathRelative::new("../pub"); + assert!(matches!( + source.subscribe(Some(&rel), "video").unwrap(), + Subscribe::Track(_) + )); + + // Excess `..` resolves to the (empty) origin root, which is not a broadcast. + let rel = PathRelative::new("../../.."); + assert!(matches!( + source.subscribe(Some(&rel), "video").unwrap(), + Subscribe::Track(_) + )); + } + + #[tokio::test] + async fn override_resolves_referenced_broadcast() { + let origin = Origin::random().produce(); + + let catalog = broadcast(); + let _catalog_publish = origin.publish_broadcast("a/pub", &catalog).unwrap(); + + let referenced = broadcast(); + let _referenced_publish = origin.publish_broadcast("a/source", &referenced).unwrap(); + + let source = Source::new(catalog.consume()).with_origin(origin.consume(), "a/pub"); + + let rel = PathRelative::new("../source"); + let Subscribe::Broadcast(pending, name) = source.subscribe(Some(&rel), "video").unwrap() else { + panic!("expected a cross-broadcast subscription"); + }; + assert_eq!(name, "video"); + pending.await.expect("referenced broadcast should resolve"); + } +} diff --git a/rs/moq-net/src/path.rs b/rs/moq-net/src/path.rs index 94fd37f03..080b9bb29 100644 --- a/rs/moq-net/src/path.rs +++ b/rs/moq-net/src/path.rs @@ -232,6 +232,45 @@ impl<'a> Path<'a> { Path(Cow::Owned(format!("{}/{}", self.0, other.as_str()))) } } + + /// Resolve a [`PathRelative`] against this path. + /// + /// `..` segments in `rel` pop the last segment of the base; other segments are appended. + /// Excess `..` is a no-op once the base is empty (subsequent named segments still append). + /// An empty `rel` returns this path as an owned copy. + /// + /// [`PathRelative::new`] strips `.` and empty segments, so they are not handled here. + /// + /// # Examples + /// ``` + /// use moq_net::{Path, PathRelative}; + /// + /// let base = Path::new("a/b/c"); + /// assert_eq!(base.resolve(&PathRelative::new("../d")).as_str(), "a/b/d"); + /// assert_eq!(base.resolve(&PathRelative::new("d")).as_str(), "a/b/c/d"); + /// assert_eq!(base.resolve(&PathRelative::new("../../../../x")).as_str(), "x"); + /// ``` + pub fn resolve(&self, rel: &PathRelative<'_>) -> PathOwned { + if rel.is_empty() { + return self.to_owned(); + } + + let mut segments: Vec<&str> = if self.0.is_empty() { + Vec::new() + } else { + self.0.split('/').collect() + }; + + for seg in rel.as_str().split('/') { + if seg == ".." { + segments.pop(); + } else { + segments.push(seg); + } + } + + Path(Cow::Owned(segments.join("/"))) + } } impl<'a> From<&'a str> for Path<'a> { @@ -320,6 +359,157 @@ impl<'de: 'a, 'a> serde::Deserialize<'de> for Path<'a> { } } +/// An owned version of [`PathRelative`] with a `'static` lifetime. +pub type PathRelativeOwned = PathRelative<'static>; + +/// A relative broadcast path, used to reference one broadcast from another broadcast's content. +/// +/// Unlike [`Path`] (which is a complete reference within the broadcast namespace), +/// `PathRelative` may contain `..` segments to walk up the namespace and is meaningful only +/// when resolved against a base [`Path`] via [`Path::resolve`]. The hang catalog uses it to +/// point a rendition at a track published in a sibling broadcast (e.g. `../source`). +/// +/// `PathRelative` has no `Encode`/`Decode` impl, so it never appears in announce/subscribe +/// frames. It does serialize via serde for off-wire use (e.g. as a field inside a catalog +/// JSON payload, which itself travels as a track). +/// +/// Normalization on creation: leading/trailing slashes are trimmed, consecutive internal +/// slashes collapse to one, and `.` segments are stripped (treated as no-ops, matching +/// POSIX). `..` is preserved and is interpreted at resolve time. +/// +/// # Examples +/// ``` +/// use moq_net::{Path, PathRelative}; +/// +/// let rel = PathRelative::new("../source"); +/// assert_eq!(Path::new("a/b").resolve(&rel).as_str(), "a/source"); +/// +/// // `.` segments are stripped on creation. +/// assert_eq!(PathRelative::new("./a/./b").as_str(), "a/b"); +/// ``` +#[derive(Debug, PartialEq, Eq, Hash, Clone, serde::Serialize)] +pub struct PathRelative<'a>(Cow<'a, str>); + +impl<'a> PathRelative<'a> { + /// Create a new `PathRelative` from a string slice. + /// + /// Leading and trailing slashes are trimmed, consecutive internal slashes collapse to one, + /// and `.` segments are stripped. See the type-level doc for the full normalization rules. + pub fn new(s: &'a str) -> Self { + let trimmed = s.trim_start_matches('/').trim_end_matches('/'); + + if needs_normalize_relative(trimmed) { + Self(Cow::Owned(normalize_relative_segments(trimmed))) + } else { + Self(Cow::Borrowed(trimmed)) + } + } + + /// The normalized path as a string slice. + pub fn as_str(&self) -> &str { + &self.0 + } + + /// The empty relative path, which resolves to the base path itself. + pub fn empty() -> PathRelative<'static> { + PathRelative(Cow::Borrowed("")) + } + + /// True if the path is empty (resolves to the base path itself). + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// The length of the normalized path in bytes. + pub fn len(&self) -> usize { + self.0.len() + } + + /// Copy into an owned version with a `'static` lifetime. + pub fn to_owned(&self) -> PathRelativeOwned { + PathRelative(Cow::Owned(self.0.to_string())) + } + + /// Convert into an owned version with a `'static` lifetime. + pub fn into_owned(self) -> PathRelativeOwned { + PathRelative(Cow::Owned(self.0.into_owned())) + } + + /// Reborrow without copying. + pub fn borrow(&'a self) -> PathRelative<'a> { + PathRelative(Cow::Borrowed(&self.0)) + } +} + +impl<'a> From<&'a str> for PathRelative<'a> { + fn from(s: &'a str) -> Self { + Self::new(s) + } +} + +impl<'a> From<&'a String> for PathRelative<'a> { + fn from(s: &'a String) -> Self { + Self::new(s) + } +} + +impl From for PathRelative<'_> { + fn from(s: String) -> Self { + let trimmed = s.trim_start_matches('/').trim_end_matches('/'); + + if needs_normalize_relative(trimmed) { + Self(Cow::Owned(normalize_relative_segments(trimmed))) + } else if trimmed == s { + Self(Cow::Owned(s)) + } else { + Self(Cow::Owned(trimmed.to_string())) + } + } +} + +fn needs_normalize_relative(trimmed: &str) -> bool { + trimmed.split('/').any(|seg| seg.is_empty() || seg == ".") +} + +fn normalize_relative_segments(trimmed: &str) -> String { + trimmed + .split('/') + .filter(|seg| !seg.is_empty() && *seg != ".") + .collect::>() + .join("/") +} + +impl Default for PathRelative<'_> { + fn default() -> Self { + Self(Cow::Borrowed("")) + } +} + +impl AsRef for PathRelative<'_> { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl Display for PathRelative<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +// Owned-only deserialization. We use `String::deserialize` so that owned deserializers +// (e.g. `serde_json::from_slice`) work. The borrowed form `<&str>::deserialize` requires +// `'de: 'a`, which is unsatisfiable when `'a = 'static`. +impl<'de> serde::Deserialize<'de> for PathRelative<'static> { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Ok(PathRelative::from(s)) + } +} + /// A deduplicated list of path prefixes. /// /// Automatically removes exact duplicates and overlapping prefixes on construction. @@ -963,4 +1153,84 @@ mod tests { let b = PathPrefixes::new(["bar", "foo"]); assert_eq!(a, b); } + + #[test] + fn test_path_relative_normalize() { + assert_eq!(PathRelative::new("foo").as_str(), "foo"); + assert_eq!(PathRelative::new("/foo/").as_str(), "foo"); + assert_eq!(PathRelative::new("foo//bar").as_str(), "foo/bar"); + assert_eq!(PathRelative::new("../foo").as_str(), "../foo"); + assert_eq!(PathRelative::new("../../a/b").as_str(), "../../a/b"); + assert!(PathRelative::new("").is_empty()); + } + + #[test] + fn test_path_relative_strips_dot_segments() { + assert_eq!(PathRelative::new(".").as_str(), ""); + assert_eq!(PathRelative::new("./foo").as_str(), "foo"); + assert_eq!(PathRelative::new("foo/./bar").as_str(), "foo/bar"); + assert_eq!(PathRelative::new("./../foo").as_str(), "../foo"); + // From takes the same normalization. + assert_eq!(PathRelative::from("./foo".to_string()).as_str(), "foo"); + assert_eq!(PathRelative::from(".".to_string()).as_str(), ""); + } + + #[test] + fn test_resolve_no_dotdot() { + let base = Path::new("a/b"); + assert_eq!(base.resolve(&PathRelative::new("c")).as_str(), "a/b/c"); + assert_eq!(base.resolve(&PathRelative::new("c/d")).as_str(), "a/b/c/d"); + } + + #[test] + fn test_resolve_empty_rel_returns_base() { + let base = Path::new("a/b"); + assert_eq!(base.resolve(&PathRelative::new("")).as_str(), "a/b"); + } + + #[test] + fn test_resolve_single_dotdot() { + let base = Path::new("a/b/c"); + assert_eq!(base.resolve(&PathRelative::new("../d")).as_str(), "a/b/d"); + assert_eq!(base.resolve(&PathRelative::new("..")).as_str(), "a/b"); + } + + #[test] + fn test_resolve_multiple_dotdot() { + let base = Path::new("a/b/c"); + assert_eq!(base.resolve(&PathRelative::new("../../x")).as_str(), "a/x"); + assert_eq!(base.resolve(&PathRelative::new("../../../x")).as_str(), "x"); + } + + #[test] + fn test_resolve_dotdot_clamps_at_root() { + let base = Path::new("a"); + // Excess `..` clamps at the root instead of escaping it. + assert_eq!(base.resolve(&PathRelative::new("../../../foo")).as_str(), "foo"); + assert_eq!(base.resolve(&PathRelative::new("..")).as_str(), ""); + } + + #[test] + fn test_resolve_empty_base() { + let base = Path::empty(); + assert_eq!(base.resolve(&PathRelative::new("foo")).as_str(), "foo"); + assert_eq!(base.resolve(&PathRelative::new("..")).as_str(), ""); + } + + #[test] + fn test_resolve_dot_is_noop() { + let base = Path::new("a/b"); + // `.` is normalized away by PathRelative::new, so resolve ignores it. + assert_eq!(base.resolve(&PathRelative::new(".")).as_str(), "a/b"); + assert_eq!(base.resolve(&PathRelative::new("./c")).as_str(), "a/b/c"); + assert_eq!(base.resolve(&PathRelative::new("./../c")).as_str(), "a/c"); + } + + #[test] + fn test_resolve_self_reference_via_dotdot() { + // Walking `..` back to the same path yields the base unchanged, which lets the + // caller compare resolved == base to detect a self-reference. + let base = Path::new("a/b"); + assert_eq!(base.resolve(&PathRelative::new("../b")).as_str(), "a/b"); + } } diff --git a/rs/moq-srt/src/ts.rs b/rs/moq-srt/src/ts.rs index baee45abf..2a9f12243 100644 --- a/rs/moq-srt/src/ts.rs +++ b/rs/moq-srt/src/ts.rs @@ -79,7 +79,11 @@ impl Subscriber { return Ok(None); }; - let export = ts::Export::new(broadcast).await?; + // Keep the origin attached so renditions referencing a sibling broadcast + // (the catalog `broadcast` field) can be resolved. + let source = moq_mux::Source::new(broadcast).with_origin(origin.consume(), path); + + let export = ts::Export::new(source).await?; Ok(Some(Self { export })) }