diff --git a/bun.lock b/bun.lock index e6d864a15..34b5ba02d 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "moq", diff --git a/js/net/src/connection/connect.ts b/js/net/src/connection/connect.ts index dfa3fa4be..37befac47 100644 --- a/js/net/src/connection/connect.ts +++ b/js/net/src/connection/connect.ts @@ -141,6 +141,10 @@ export async function connect(url: URL, props?: ConnectProps): Promise { const tasks: Promise[] = [this.#runSession(), this.#runBidis(), this.#runUnis()]; + // moq-lite-05+: each endpoint opens a Setup stream and sends a single SETUP message. + if (hasSetupStream(this.#version)) { + tasks.push(this.#runSetup()); + } + if (this.sendBandwidth) { tasks.push(this.#runSendBandwidth(this.sendBandwidth)); } @@ -184,6 +191,9 @@ export class Connection implements Established { } else if (typ === StreamId.Subscribe) { const msg = await Subscribe.decode(stream.reader, this.#version); await this.#publisher.runSubscribe(msg, stream); + } else if (typ === StreamId.Track) { + const msg = await Track.decode(stream.reader, this.#version); + await this.#publisher.runTrackInfo(msg, stream); } else if (typ === StreamId.Probe) { await this.#publisher.runProbe(stream); } else if (typ === StreamId.Goaway) { @@ -213,14 +223,31 @@ export class Connection implements Established { async #runUni(stream: Reader) { const typ = await stream.u8(); - if (typ === 0) { + if (typ === DataId.Group) { const msg = await Group.decode(stream); await this.#subscriber.runGroup(msg, stream); + } else if (typ === DataId.Setup) { + // moq-lite-05+: read the peer's SETUP and discard it (the path, if any, + // is handled at accept time by the server's transport binding). + await Setup.decode(stream, this.#version); } else { throw new Error(`unknown stream type: ${typ.toString()}`); } } + // moq-lite-05+: open a unidirectional Setup stream, send a single SETUP, and FIN. + // The browser conveys its path via the WebTransport URL, so the SETUP is empty. + async #runSetup(): Promise { + const writer = await Writer.open(this.#quic); + try { + await writer.u8(DataId.Setup); + await new Setup().encode(writer, this.#version); + writer.close(); + } catch (err: unknown) { + writer.reset(err); + } + } + async #runSendBandwidth(bandwidth: Bandwidth): Promise { const quic = this.#quic as unknown as { getStats: () => Promise<{ estimatedSendRate: number | null }>; diff --git a/js/net/src/lite/publisher.ts b/js/net/src/lite/publisher.ts index 1bcfe6786..51647f08f 100644 --- a/js/net/src/lite/publisher.ts +++ b/js/net/src/lite/publisher.ts @@ -9,8 +9,10 @@ import { Announce, AnnounceInit, type AnnounceInterest } from "./announce.ts"; import { Group as GroupMessage } from "./group.ts"; import type { Origin } from "./origin.ts"; import { Probe } from "./probe.ts"; -import { encodeSubscribeResponse, type Subscribe, SubscribeOk, SubscribeUpdate } from "./subscribe.ts"; -import { Version } from "./version.ts"; +import { encodeSubscribeResponse, type Subscribe, SubscribeEnd, SubscribeOk, SubscribeUpdate } from "./subscribe.ts"; +import type { Track as TrackMessage } from "./track.ts"; +import { TrackInfo } from "./track.ts"; +import { hasTrackStream, Version } from "./version.ts"; const PROBE_INTERVAL = 100; // ms const PROBE_MAX_AGE = 10_000; // ms @@ -170,7 +172,11 @@ export class Publisher { const track = broadcast.subscribe(msg.track, msg.priority); try { - const info = new SubscribeOk({ priority: msg.priority }); + // moq-lite-05+ trims SUBSCRIBE_OK to the resolved start group; the publisher + // properties move to TRACK_INFO. Earlier versions echo the priority inline. + const info = hasTrackStream(this.version) + ? new SubscribeOk({ startGroup: msg.startGroup ?? 0 }) + : new SubscribeOk({ priority: msg.priority }); await encodeSubscribeResponse(stream.writer, { ok: info }, this.version); console.debug(`publish ok: broadcast=${msg.broadcast} track=${track.name}`); @@ -202,6 +208,23 @@ export class Publisher { } } + /** + * Serves a Track stream (moq-lite-05+): replies with the track's immutable TRACK_INFO. + * + * @internal + */ + async runTrackInfo(msg: TrackMessage, stream: Stream) { + const broadcast = this.#broadcasts.peek()?.get(msg.broadcast); + if (!broadcast) { + throw new Error("not found"); + } + + // The publisher properties aren't tracked in the browser model yet; report + // defaults with a millisecond timescale matching the wall-clock frame timestamps. + const info = new TrackInfo({ timescale: 1000 }); + await info.encode(stream.writer, this.version); + } + /** * Runs a track and sends its data to the stream. * @param sub - The subscription ID @@ -213,15 +236,32 @@ export class Publisher { */ async #runTrack(sub: bigint, broadcast: Path.Valid, track: Track, stream: Writer) { try { + let lastSequence: number | undefined; + let trackEnded = false; + for (;;) { const next = track.recvGroup(); - const group = await Promise.race([next, stream.closed]); - if (!group) { + const res = await Promise.race([ + next.then((group) => ({ group })), + stream.closed.then(() => "closed" as const), + ]); + if (res === "closed") { next.then((group) => group?.close()).catch(() => {}); break; } + if (!res.group) { + trackEnded = true; + break; + } - void this.#runGroup(sub, group); + lastSequence = res.group.sequence; + void this.#runGroup(sub, res.group); + } + + // moq-lite-05+: signal end-of-track before the subscribe stream FINs. + if (trackEnded && hasTrackStream(this.version)) { + const end = new SubscribeEnd({ group: lastSequence ?? 0 }); + await encodeSubscribeResponse(stream, { end }, this.version); } console.debug(`publish close: broadcast=${broadcast} track=${track.name}`); @@ -250,10 +290,21 @@ export class Publisher { await msg.encode(stream); try { + // moq-lite-05+ stamps each frame with a wall-clock millisecond timestamp, sent + // as a zigzag delta from the previous frame (the first frame is a delta from 0). + let prevTimestamp = 0n; + for (;;) { const frame = await Promise.race([group.readFrame(), stream.closed]); if (!frame) break; + if (hasTrackStream(this.version)) { + const now = BigInt(Date.now()); + const delta = now - prevTimestamp; + prevTimestamp = now; + await stream.u62((delta << 1n) ^ (delta >> 63n)); + } + await stream.u53(frame.byteLength); await stream.write(frame); } diff --git a/js/net/src/lite/setup.ts b/js/net/src/lite/setup.ts new file mode 100644 index 000000000..e96c8f1ec --- /dev/null +++ b/js/net/src/lite/setup.ts @@ -0,0 +1,58 @@ +import type { Reader, Writer } from "../stream.ts"; +import * as Message from "./message.ts"; +import { hasSetupStream, type Version } from "./version.ts"; + +/// Setup parameter ID for the request Path (client-only, on URI-less transports). +const PARAM_PATH = 0x2; + +/** + * The moq-lite-05 SETUP message, sent once per endpoint on a unidirectional Setup stream. + * + * The browser conveys its path via the WebTransport URL, so it never sends a Path + * parameter; this class still parses one for completeness when reading a peer's SETUP. + */ +export class Setup { + /// The request path, only sent on transport bindings without a request URI. + path?: string; + + constructor(props: { path?: string } = {}) { + this.path = props.path; + } + + async #encode(w: Writer) { + if (this.path !== undefined) { + await w.u53(1); // parameter count + const value = new TextEncoder().encode(this.path); + await w.u53(PARAM_PATH); + await w.u53(value.byteLength); + await w.write(value); + } else { + await w.u53(0); // no parameters + } + } + + static async #decode(r: Reader): Promise { + const count = await r.u53(); + let path: string | undefined; + for (let i = 0; i < count; i++) { + const id = await r.u53(); + const len = await r.u53(); + const value = await r.read(len); + // Unknown parameters are ignored so new ones stay backward compatible. + if (id === PARAM_PATH) { + path = new TextDecoder().decode(value); + } + } + return new Setup({ path }); + } + + async encode(w: Writer, version: Version): Promise { + if (!hasSetupStream(version)) throw new Error("SETUP requires moq-lite-05+"); + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader, version: Version): Promise { + if (!hasSetupStream(version)) throw new Error("SETUP requires moq-lite-05+"); + return Message.decode(r, Setup.#decode); + } +} diff --git a/js/net/src/lite/stream.ts b/js/net/src/lite/stream.ts index 71433a384..7c0a7469f 100644 --- a/js/net/src/lite/stream.ts +++ b/js/net/src/lite/stream.ts @@ -14,6 +14,15 @@ export const StreamId = { Fetch: 3, Probe: 4, Goaway: 5, + /// Queries a track's immutable publisher properties via TRACK/TRACK_INFO (moq-lite-05+). + Track: 6, ClientCompat: 0x20, ServerCompat: 0x21, } as const; + +/// Unidirectional (data) stream types. +export const DataId = { + Group: 0, + /// Carries a single SETUP message (moq-lite-05+). + Setup: 1, +} as const; diff --git a/js/net/src/lite/subscribe.ts b/js/net/src/lite/subscribe.ts index edd096646..4aa3fff22 100644 --- a/js/net/src/lite/subscribe.ts +++ b/js/net/src/lite/subscribe.ts @@ -198,6 +198,10 @@ export class SubscribeOk { case Version.DRAFT_01: await w.u8(this.priority ?? 0); break; + case Version.DRAFT_05_WIP: + // moq-lite-05+: just the resolved absolute start group (raw, 0 is valid). + await w.u53(this.startGroup ?? 0); + break; default: await w.u8(this.priority); await w.bool(this.ordered); @@ -222,6 +226,9 @@ export class SubscribeOk { case Version.DRAFT_01: priority = await r.u8(); break; + case Version.DRAFT_05_WIP: + // moq-lite-05+: just the resolved absolute start group (raw). + return new SubscribeOk({ startGroup: await r.u53() }); default: priority = await r.u8(); ordered = await r.bool(); @@ -282,17 +289,47 @@ export class SubscribeDrop { } } +/** + * Signals that no group after a given sequence will be produced. + * + * moq-lite-05+ only. + */ +export class SubscribeEnd { + group: number; + + constructor(props: { group: number }) { + this.group = props.group; + } + + async #encode(w: Writer) { + await w.u53(this.group); + } + + static async #decode(r: Reader): Promise { + return new SubscribeEnd({ group: await r.u53() }); + } + + async encode(w: Writer): Promise { + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader): Promise { + return Message.decode(r, SubscribeEnd.#decode); + } +} + /** * A response message on the subscribe stream. * - * In Draft03+, each response is prefixed with a type discriminator: - * - 0x0 for SUBSCRIBE_OK - * - 0x1 for SUBSCRIBE_DROP + * In Draft03/04 each response is prefixed with a type discriminator: 0x0 SUBSCRIBE_OK, + * 0x1 SUBSCRIBE_DROP. In Draft05 end-of-subscription splits out, so the discriminators + * become 0x0 SUBSCRIBE_OK, 0x1 SUBSCRIBE_END, 0x2 SUBSCRIBE_DROP. * - * SUBSCRIBE_OK must be the first message on the response stream. + * SUBSCRIBE_OK is normally the first message on the response stream. */ -export type SubscribeResponse = { ok: SubscribeOk } | { drop: SubscribeDrop }; +export type SubscribeResponse = { ok: SubscribeOk } | { end: SubscribeEnd } | { drop: SubscribeDrop }; +/** Encode a subscribe-stream response (OK / END / DROP) for the negotiated moq-lite version. */ export async function encodeSubscribeResponse(w: Writer, resp: SubscribeResponse, version: Version): Promise { switch (version) { case Version.DRAFT_01: @@ -300,26 +337,55 @@ export async function encodeSubscribeResponse(w: Writer, resp: SubscribeResponse if ("ok" in resp) { await resp.ok.encode(w, version); } else { - throw new Error("subscribe drop not supported for this version"); + throw new Error("only SUBSCRIBE_OK is supported for this version"); } break; - default: + case Version.DRAFT_05_WIP: + // moq-lite-05+: 0x0 OK, 0x1 END, 0x2 DROP. if ("ok" in resp) { await w.u53(0x0); await resp.ok.encode(w, version); + } else if ("end" in resp) { + await w.u53(0x1); + await resp.end.encode(w); } else { + await w.u53(0x2); + await resp.drop.encode(w); + } + break; + default: + // Draft03/04: 0x0 OK, 0x1 DROP; no SUBSCRIBE_END. + if ("ok" in resp) { + await w.u53(0x0); + await resp.ok.encode(w, version); + } else if ("drop" in resp) { await w.u53(0x1); await resp.drop.encode(w); + } else { + throw new Error("SUBSCRIBE_END is not supported for this version"); } break; } } -export async function decodeSubscribeResponse(r: Reader, version: Version): Promise { +async function decodeResponse(r: Reader, version: Version): Promise { switch (version) { case Version.DRAFT_01: case Version.DRAFT_02: return { ok: await SubscribeOk.decode(r, version) }; + case Version.DRAFT_05_WIP: { + const typ = await r.u53(); + switch (typ) { + case 0x0: + return { ok: await SubscribeOk.decode(r, version) }; + case 0x1: + return { end: await SubscribeEnd.decode(r) }; + case 0x2: + return { drop: await SubscribeDrop.decode(r) }; + default: + throw new Error(`unknown subscribe response type: ${typ}`); + } + } default: { const typ = await r.u53(); switch (typ) { @@ -333,3 +399,17 @@ export async function decodeSubscribeResponse(r: Reader, version: Version): Prom } } } + +/** Decode the next subscribe-stream response for the negotiated moq-lite version. */ +export async function decodeSubscribeResponse(r: Reader, version: Version): Promise { + return decodeResponse(r, version); +} + +/** Like {@link decodeSubscribeResponse} but returns `undefined` at a clean stream end (FIN). */ +export async function decodeSubscribeResponseMaybe( + r: Reader, + version: Version, +): Promise { + if (await r.done()) return undefined; + return decodeResponse(r, version); +} diff --git a/js/net/src/lite/subscriber.ts b/js/net/src/lite/subscriber.ts index 1e07befce..7f274ff92 100644 --- a/js/net/src/lite/subscriber.ts +++ b/js/net/src/lite/subscriber.ts @@ -13,8 +13,8 @@ import type { Group as GroupMessage } from "./group.ts"; import type { Origin } from "./origin.ts"; import { Probe } from "./probe.ts"; import { StreamId } from "./stream.ts"; -import { decodeSubscribeResponse, Subscribe, SubscribeUpdate } from "./subscribe.ts"; -import { Version } from "./version.ts"; +import { decodeSubscribeResponse, decodeSubscribeResponseMaybe, Subscribe, SubscribeUpdate } from "./subscribe.ts"; +import { hasTrackStream, Version } from "./version.ts"; /** * Options accepted by {@link Subscriber.announced}. @@ -195,7 +195,9 @@ export class Subscriber { // Watch for priority changes and send SUBSCRIBE_UPDATE. Lite01/Lite02 // don't carry SUBSCRIBE_UPDATE on the wire, so skip the watcher there // and just wait on the stream/track like before. - const waits: Promise[] = [stream.reader.closed, request.track.closed]; + // Draining responses also consumes any SUBSCRIBE_END / SUBSCRIBE_DROP and + // resolves when the publisher FINs. + const waits: Promise[] = [this.#drainResponses(stream), request.track.closed]; switch (this.version) { case Version.DRAFT_01: case Version.DRAFT_02: @@ -222,6 +224,20 @@ export class Subscriber { } } + /** + * Drain responses on the subscribe stream until the publisher FINs. + * + * The first SUBSCRIBE_OK is consumed by the caller; this reads any further + * SUBSCRIBE_END / SUBSCRIBE_DROP. They aren't acted on yet (groups arrive on + * their own streams regardless), but consuming them keeps the stream aligned. + */ + async #drainResponses(stream: Stream): Promise { + for (;;) { + const resp = await decodeSubscribeResponseMaybe(stream.reader, this.version); + if (!resp) break; + } + } + /** * Send SUBSCRIBE_UPDATE messages whenever the track's priority signal changes. * @@ -292,6 +308,12 @@ export class Subscriber { const done = await Promise.race([stream.done(), subscribe.closed, producer.closed]); if (done !== false) break; + if (hasTrackStream(this.version)) { + // moq-lite-05+ prefixes each frame with a zigzag timestamp delta. Decode it + // to stay aligned with the wire, but don't surface it yet. + await stream.u62(); + } + const size = await stream.u53(); const payload = await stream.read(size); if (!payload) break; diff --git a/js/net/src/lite/track.ts b/js/net/src/lite/track.ts new file mode 100644 index 000000000..d9dc5bf4e --- /dev/null +++ b/js/net/src/lite/track.ts @@ -0,0 +1,87 @@ +import * as Path from "../path.ts"; +import type { Reader, Writer } from "../stream.ts"; +import * as Message from "./message.ts"; +import { hasTrackStream, type Version } from "./version.ts"; + +/** + * Sent by a subscriber on a Track stream (moq-lite-05+) to request a track's + * immutable publisher properties without subscribing or fetching. + */ +export class Track { + broadcast: Path.Valid; + track: string; + + constructor(props: { broadcast: Path.Valid; track: string }) { + this.broadcast = props.broadcast; + this.track = props.track; + } + + async #encode(w: Writer) { + await w.string(this.broadcast); + await w.string(this.track); + } + + static async #decode(r: Reader): Promise { + const broadcast = Path.from(await r.string()); + const track = await r.string(); + return new Track({ broadcast, track }); + } + + async encode(w: Writer, version: Version): Promise { + if (!hasTrackStream(version)) throw new Error("TRACK requires moq-lite-05+"); + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader, version: Version): Promise { + if (!hasTrackStream(version)) throw new Error("TRACK requires moq-lite-05+"); + return Message.decode(r, Track.#decode); + } +} + +/** + * The publisher's reply on a Track stream (moq-lite-05+): the track's immutable + * properties. Sent once, then the publisher FINs the stream. + */ +export class TrackInfo { + priority: number; + ordered: boolean; + maxLatency: number; + timescale: number; + + constructor(props: { priority?: number; ordered?: boolean; maxLatency?: number; timescale: number }) { + this.priority = props.priority ?? 0; + this.ordered = props.ordered ?? false; + this.maxLatency = props.maxLatency ?? 0; + this.timescale = props.timescale; + } + + async #encode(w: Writer) { + await w.u8(this.priority); + await w.bool(this.ordered); + await w.u53(this.maxLatency); + await w.u53(this.timescale); + } + + static async #decode(r: Reader): Promise { + const priority = await r.u8(); + const ordered = await r.bool(); + const maxLatency = await r.u53(); + const timescale = await r.u53(); + // A zero timescale is a protocol violation: every track has a media timeline. + if (timescale === 0) throw new Error("TRACK_INFO timescale must be non-zero"); + return new TrackInfo({ priority, ordered, maxLatency, timescale }); + } + + async encode(w: Writer, version: Version): Promise { + if (!hasTrackStream(version)) throw new Error("TRACK_INFO requires moq-lite-05+"); + // Reject a zero timescale on encode too (mirrors the Rust side), so an invalid + // TrackInfo fails fast on the sender rather than only at the peer's decoder. + if (this.timescale === 0) throw new Error("TRACK_INFO timescale must be non-zero"); + return Message.encode(w, this.#encode.bind(this)); + } + + static async decode(r: Reader, version: Version): Promise { + if (!hasTrackStream(version)) throw new Error("TRACK_INFO requires moq-lite-05+"); + return Message.decode(r, TrackInfo.#decode); + } +} diff --git a/js/net/src/lite/version.ts b/js/net/src/lite/version.ts index a3beda5f5..f131ed0b1 100644 --- a/js/net/src/lite/version.ts +++ b/js/net/src/lite/version.ts @@ -35,3 +35,15 @@ const VERSION_NAMES: Record = { export function versionName(v: Version): string { return VERSION_NAMES[v] ?? `unknown(0x${v.toString(16)})`; } + +/// Whether this version uses a unidirectional Setup stream (moq-lite-05+). +export function hasSetupStream(v: Version): boolean { + return v === Version.DRAFT_05_WIP; +} + +/// Whether this version uses the moq-lite-05 framing bundle: the Track stream +/// (TRACK/TRACK_INFO), a SUBSCRIBE_OK trimmed to the resolved start group plus a +/// distinct SUBSCRIBE_END, and a per-frame timestamp delta. +export function hasTrackStream(v: Version): boolean { + return v === Version.DRAFT_05_WIP; +} diff --git a/rs/moq-net/src/lite/mod.rs b/rs/moq-net/src/lite/mod.rs index 46b694532..aa8c462c7 100644 --- a/rs/moq-net/src/lite/mod.rs +++ b/rs/moq-net/src/lite/mod.rs @@ -19,6 +19,7 @@ mod setup; mod stream; mod subscribe; mod subscriber; +mod track; mod version; pub use announce::*; @@ -38,4 +39,6 @@ pub(super) use setup::{accept_setup, send_setup}; pub use stream::*; pub use subscribe::*; use subscriber::*; +#[allow(unused_imports)] +pub use track::*; pub use version::Version; diff --git a/rs/moq-net/src/lite/publisher.rs b/rs/moq-net/src/lite/publisher.rs index 4c0ef8f9e..882d80d8c 100644 --- a/rs/moq-net/src/lite/publisher.rs +++ b/rs/moq-net/src/lite/publisher.rs @@ -72,6 +72,7 @@ impl Publisher { if let Err(err) = match kind { lite::ControlType::Announce => self.recv_announce(stream).await, lite::ControlType::Subscribe => self.recv_subscribe(stream).await, + lite::ControlType::Track => self.recv_track(stream).await, lite::ControlType::Probe => { self.recv_probe(stream); Ok(()) @@ -398,6 +399,50 @@ impl Publisher { Ok(()) } + /// Serve a Track stream (moq-lite-05+): reply with the track's immutable TRACK_INFO. + pub async fn recv_track(&mut self, mut stream: Stream) -> Result<(), Error> { + let msg = stream.reader.decode::().await?; + + let absolute = self.origin.absolute(&msg.broadcast).to_owned(); + let name = msg.track.to_string(); + tracing::debug!(broadcast = %absolute, track = %name, "track info requested"); + + let broadcast = self.origin.get_broadcast(&msg.broadcast); + + web_async::spawn(async move { + if let Err(err) = Self::run_track_info(&mut stream, broadcast, name).await { + tracing::debug!(broadcast = %absolute, %err, "track info error"); + stream.writer.abort(&err); + } + }); + + Ok(()) + } + + async fn run_track_info( + stream: &mut Stream, + consumer: Option, + name: String, + ) -> Result<(), Error> { + let broadcast = consumer.ok_or(Error::NotFound)?; + // Resolve the track to read its publisher properties. The query carries no + // priority; a reused producer reports its own authored value. + let track = broadcast.subscribe_track(&Track { name, priority: 0 })?; + + // ordered/max_latency aren't tracked in the model yet; the timescale is + // milliseconds to match the wall-clock frame timestamps we stamp on the wire. + let info = lite::TrackInfo { + priority: track.priority, + ordered: false, + max_latency: Duration::ZERO, + timescale: 1000, + }; + + stream.writer.encode(&info).await?; + stream.writer.finish()?; + stream.writer.closed().await + } + async fn run_subscribe( session: S, stream: &mut Stream, @@ -417,20 +462,40 @@ impl Publisher { }; let broadcast = consumer.ok_or(Error::NotFound)?; - let track = broadcast.subscribe_track(&track)?; + let mut track = broadcast.subscribe_track(&track)?; // Subscription is now active: count this session as a viewer of the // broadcast. Dropping this guard (subscription end) releases it. let _broadcast_sub = broadcasts.subscribe(&absolute); - // TODO wait until track.info() to get the *real* priority + // Resolve the absolute start group once: a non-zero request wins, otherwise the + // latest group (or 0 for a track with none yet). The same value is advertised in + // SUBSCRIBE_OK and used to position the track here, so the acknowledged start can't + // diverge from the served one if a new group arrives in between. + let resolved_start = subscribe.start_group.or_else(|| track.latest()); + let start_group = resolved_start.unwrap_or(0); + if let Some(start) = resolved_start { + track.start_at(start); + } - let info = lite::SubscribeOk { - priority: track.priority, - ordered: false, - max_latency: std::time::Duration::ZERO, - start_group: None, - end_group: None, + let info = if version.has_track_stream() { + // moq-lite-05+: SUBSCRIBE_OK carries only the resolved start group; the + // publisher properties live in TRACK_INFO on the Track stream. + lite::SubscribeOk { + priority: 0, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: Some(start_group), + end_group: None, + } + } else { + lite::SubscribeOk { + priority: track.priority, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: None, + end_group: None, + } }; stream.writer.encode(&lite::SubscribeResponse::Ok(info)).await?; @@ -441,9 +506,22 @@ impl Publisher { let (track_priority_tx, track_priority_rx) = tokio::sync::watch::channel(track.priority); let track_stats = std::sync::Arc::new(track_stats); - tokio::select! { - res = Self::run_track(session, track, subscribe, priority, track_stats, track_priority_rx, version) => res?, - res = Self::run_subscribe_updates(&mut stream.reader, &track_priority_tx) => res?, + // `Some(last_group)` means the track ended (and we owe a SUBSCRIBE_END); `None` + // means the subscriber tore down the stream first, so no end signal is owed. + let ended = tokio::select! { + res = Self::run_track(session, track, subscribe, priority, track_stats, track_priority_rx, version) => Some(res?), + res = Self::run_subscribe_updates(&mut stream.reader, &track_priority_tx) => { res?; None } + }; + + // moq-lite-05+: signal end-of-track before FIN once no further groups will be produced. + if version.has_track_stream() + && let Some(last) = ended + { + let group = last.unwrap_or(start_group); + stream + .writer + .encode(&lite::SubscribeResponse::End(lite::SubscribeEnd { group })) + .await?; } stream.writer.finish()?; @@ -468,13 +546,12 @@ impl Publisher { track_stats: std::sync::Arc, mut track_priority: tokio::sync::watch::Receiver, version: Version, - ) -> Result<(), Error> { + ) -> Result, Error> { let mut tasks = FuturesUnordered::new(); - // Start the consumer at the specified sequence, otherwise start at the latest group. - if let Some(start_group) = subscribe.start_group.or_else(|| track.latest()) { - track.start_at(start_group); - } + // Highest group sequence handed to a Group stream, reported in SUBSCRIBE_END (moq-lite-05+). + // The consumer was already positioned by `run_subscribe` from the resolved start group. + let mut last_sequence: Option = None; loop { let group = tokio::select! { @@ -484,10 +561,11 @@ impl Publisher { false } => unreachable!(), Some(group) = track.recv_group().transpose() => group, - else => return Ok(()), + else => return Ok(last_sequence), }?; let sequence = group.sequence; + last_sequence = last_sequence.max(Some(sequence)); tracing::debug!(subscribe = %subscribe.id, track = %track.name, sequence, "serving group"); let msg = lite::Group { @@ -530,6 +608,10 @@ impl Publisher { stream.encode(&msg).await?; track_stats.group(); + // moq-lite-05+ stamps each frame with a wall-clock millisecond timestamp, sent as a + // zigzag delta from the previous frame (the first frame is a delta from 0). + let mut prev_timestamp: i64 = 0; + loop { let frame = tokio::select! { biased; @@ -550,6 +632,14 @@ impl Publisher { None => break, }; + if version.has_track_stream() { + let now = i64::try_from(crate::Time::now().as_millis()).unwrap_or(i64::MAX); + let delta = now - prev_timestamp; + prev_timestamp = now; + let zigzag = ((delta << 1) ^ (delta >> 63)) as u64; + stream.encode(&zigzag).await?; + } + stream.encode(&frame.size).await?; track_stats.frame(); diff --git a/rs/moq-net/src/lite/stream.rs b/rs/moq-net/src/lite/stream.rs index 22a5cc539..dea0390ca 100644 --- a/rs/moq-net/src/lite/stream.rs +++ b/rs/moq-net/src/lite/stream.rs @@ -13,6 +13,8 @@ pub enum ControlType { Fetch = 3, Probe = 4, Goaway = 5, + /// Queries a track's immutable publisher properties via TRACK/TRACK_INFO (moq-lite-05+). + Track = 6, } impl Decode for ControlType { diff --git a/rs/moq-net/src/lite/subscribe.rs b/rs/moq-net/src/lite/subscribe.rs index a4966dde5..2443cf3d8 100644 --- a/rs/moq-net/src/lite/subscribe.rs +++ b/rs/moq-net/src/lite/subscribe.rs @@ -72,6 +72,12 @@ impl Message for Subscribe<'_> { } } +/// The publisher's positive response to a SUBSCRIBE. +/// +/// In moq-lite-05 this is trimmed to the resolved absolute start group; the publisher's +/// per-track properties moved to [`TrackInfo`](super::TrackInfo). The resolved group lives +/// in `start_group` as a raw absolute sequence (not the `+1`-encoded form used in the +/// SUBSCRIBE request). Earlier versions carry the publisher properties inline. #[derive(Clone, Debug)] pub struct SubscribeOk { pub priority: u8, @@ -88,13 +94,17 @@ impl Message for SubscribeOk { self.priority.encode(w, version)?; } Version::Lite02 => {} - _ => { + Version::Lite03 | Version::Lite04 => { self.priority.encode(w, version)?; (self.ordered as u8).encode(w, version)?; self.max_latency.encode(w, version)?; self.start_group.encode(w, version)?; self.end_group.encode(w, version)?; } + // moq-lite-05+: just the resolved absolute start group (raw, 0 is valid). + _ => { + self.start_group.unwrap_or(0).encode(w, version)?; + } } Ok(()) @@ -116,7 +126,7 @@ impl Message for SubscribeOk { start_group: None, end_group: None, }), - _ => { + Version::Lite03 | Version::Lite04 => { let priority = u8::decode(r, version)?; let ordered = u8::decode(r, version)? != 0; let max_latency = std::time::Duration::decode(r, version)?; @@ -131,7 +141,46 @@ impl Message for SubscribeOk { end_group, }) } + // moq-lite-05+: just the resolved absolute start group. + _ => Ok(Self { + priority: 0, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: Some(u64::decode(r, version)?), + end_group: None, + }), + } + } +} + +/// Sent by the publisher to signal that no group after a given sequence will be produced. +/// +/// moq-lite-05+ only. Bounds the subscription range; the stream still FINs only once every +/// group up to this sequence has been accounted for. +#[derive(Clone, Debug)] +pub struct SubscribeEnd { + /// The absolute sequence of the last group that may be delivered (inclusive). + pub group: u64, +} + +impl Message for SubscribeEnd { + fn decode_msg(r: &mut R, version: Version) -> Result { + if !version.has_track_stream() { + return Err(DecodeError::Version); + } + + Ok(Self { + group: u64::decode(r, version)?, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { + if !version.has_track_stream() { + return Err(EncodeError::Version); } + + self.group.encode(w, version)?; + Ok(()) } } @@ -263,46 +312,59 @@ impl Message for SubscribeDrop { /// A response message on the subscribe stream. /// -/// In Draft03, each response is prefixed with a type discriminator: -/// - 0x0 for SUBSCRIBE_OK -/// - 0x1 for SUBSCRIBE_DROP +/// In Lite03/04 each response is prefixed with a type discriminator: 0x0 SUBSCRIBE_OK, +/// 0x1 SUBSCRIBE_DROP. In Lite05 end-of-subscription splits out, so the discriminators +/// become 0x0 SUBSCRIBE_OK, 0x1 SUBSCRIBE_END, 0x2 SUBSCRIBE_DROP. /// -/// SUBSCRIBE_OK must be the first message on the response stream. +/// SUBSCRIBE_OK is normally the first message on the response stream. #[derive(Clone, Debug)] pub enum SubscribeResponse { Ok(SubscribeOk), + End(SubscribeEnd), Drop(SubscribeDrop), } +/// Encode a size-prefixed message body (no type discriminator). +fn encode_body(msg: &M, w: &mut W, version: Version) -> Result<(), EncodeError> { + let mut sizer = Sizer::default(); + Message::encode_msg(msg, &mut sizer, version)?; + sizer.size.encode(w, version)?; + Message::encode_msg(msg, w, version) +} + impl Encode for SubscribeResponse { fn encode(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { match version { + // No type discriminator; only SUBSCRIBE_OK exists. Version::Lite01 | Version::Lite02 => match self { + Self::Ok(ok) => encode_body(ok, w, version)?, + Self::End(_) | Self::Drop(_) => return Err(EncodeError::Version), + }, + // 0x0 OK, 0x1 DROP; no SUBSCRIBE_END. + Version::Lite03 | Version::Lite04 => match self { Self::Ok(ok) => { - let mut sizer = Sizer::default(); - Message::encode_msg(ok, &mut sizer, version)?; - sizer.size.encode(w, version)?; - Message::encode_msg(ok, w, version)?; + 0u64.encode(w, version)?; + encode_body(ok, w, version)?; } - Self::Drop(_) => { - return Err(EncodeError::Version); + Self::Drop(drop) => { + 1u64.encode(w, version)?; + encode_body(drop, w, version)?; } + Self::End(_) => return Err(EncodeError::Version), }, + // moq-lite-05+: 0x0 OK, 0x1 END, 0x2 DROP. _ => match self { Self::Ok(ok) => { 0u64.encode(w, version)?; - // Write size-prefixed body using Message trait - let mut sizer = Sizer::default(); - Message::encode_msg(ok, &mut sizer, version)?; - sizer.size.encode(w, version)?; - Message::encode_msg(ok, w, version)?; + encode_body(ok, w, version)?; } - Self::Drop(drop) => { + Self::End(end) => { 1u64.encode(w, version)?; - let mut sizer = Sizer::default(); - Message::encode_msg(drop, &mut sizer, version)?; - sizer.size.encode(w, version)?; - Message::encode_msg(drop, w, version)?; + encode_body(end, w, version)?; + } + Self::Drop(drop) => { + 2u64.encode(w, version)?; + encode_body(drop, w, version)?; } }, } @@ -315,7 +377,7 @@ impl Decode for SubscribeResponse { fn decode(buf: &mut B, version: Version) -> Result { match version { Version::Lite01 | Version::Lite02 => Ok(Self::Ok(SubscribeOk::decode(buf, version)?)), - _ => { + Version::Lite03 | Version::Lite04 => { let typ = u64::decode(buf, version)?; match typ { 0 => Ok(Self::Ok(SubscribeOk::decode(buf, version)?)), @@ -323,6 +385,94 @@ impl Decode for SubscribeResponse { _ => Err(DecodeError::InvalidMessage(typ)), } } + _ => { + let typ = u64::decode(buf, version)?; + match typ { + 0 => Ok(Self::Ok(SubscribeOk::decode(buf, version)?)), + 1 => Ok(Self::End(SubscribeEnd::decode(buf, version)?)), + 2 => Ok(Self::Drop(SubscribeDrop::decode(buf, version)?)), + _ => Err(DecodeError::InvalidMessage(typ)), + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::BytesMut; + + fn response_roundtrip(resp: &SubscribeResponse, version: Version) -> SubscribeResponse { + let mut buf = BytesMut::new(); + resp.encode(&mut buf, version).unwrap(); + let mut slice = &buf[..]; + let decoded = SubscribeResponse::decode(&mut slice, version).unwrap(); + assert!(bytes::Buf::remaining(&slice) == 0, "trailing bytes after decode"); + decoded + } + + #[test] + fn lite05_subscribe_ok_resolved_group() { + let resp = SubscribeResponse::Ok(SubscribeOk { + priority: 0, + ordered: false, + max_latency: std::time::Duration::ZERO, + start_group: Some(42), + end_group: None, + }); + match response_roundtrip(&resp, Version::Lite05Wip) { + SubscribeResponse::Ok(ok) => assert_eq!(ok.start_group, Some(42)), + other => panic!("expected Ok, got {other:?}"), + } + } + + #[test] + fn lite05_subscribe_end() { + let resp = SubscribeResponse::End(SubscribeEnd { group: 7 }); + match response_roundtrip(&resp, Version::Lite05Wip) { + SubscribeResponse::End(end) => assert_eq!(end.group, 7), + other => panic!("expected End, got {other:?}"), + } + } + + #[test] + fn lite05_subscribe_drop() { + let resp = SubscribeResponse::Drop(SubscribeDrop { + start: 1, + end: 3, + error: 0, + }); + match response_roundtrip(&resp, Version::Lite05Wip) { + SubscribeResponse::Drop(d) => { + assert_eq!(d.start, 1); + assert_eq!(d.end, 3); + } + other => panic!("expected Drop, got {other:?}"), + } + } + + #[test] + fn lite04_has_no_subscribe_end() { + let resp = SubscribeResponse::End(SubscribeEnd { group: 5 }); + let mut buf = BytesMut::new(); + assert!(matches!( + resp.encode(&mut buf, Version::Lite04), + Err(EncodeError::Version) + )); + } + + #[test] + fn lite04_drop_uses_type_1() { + // In lite-04 DROP is discriminator 1; lite-05 reassigns 1 to END. + let resp = SubscribeResponse::Drop(SubscribeDrop { + start: 2, + end: 2, + error: 9, + }); + match response_roundtrip(&resp, Version::Lite04) { + SubscribeResponse::Drop(d) => assert_eq!(d.error, 9), + other => panic!("expected Drop, got {other:?}"), } } } diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index d6503f15a..7be737071 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -467,8 +467,9 @@ impl Subscriber { let abs = self.origin.as_ref().unwrap().absolute(&msg.broadcast); let _broadcast_sub = self.broadcasts.subscribe(&abs); - // TODO handle additional SUBSCRIBE_OK and SUBSCRIBE_DROP messages. - stream.reader.closed().await?; + // Drain any further responses (SUBSCRIBE_END / SUBSCRIBE_DROP) until the publisher FINs. + // We don't act on them yet; groups arrive on their own streams regardless. + while stream.reader.decode_maybe::().await?.is_some() {} Ok(()) } @@ -530,7 +531,21 @@ impl Subscriber { mut group: GroupProducer, track_stats: Arc, ) -> Result<(), Error> { - while let Some(size) = stream.decode_maybe::().await? { + loop { + let size = if self.version.has_track_stream() { + // moq-lite-05+: each frame is prefixed with a zigzag timestamp delta. We + // decode it to stay aligned with the wire, but don't surface it yet. + let Some(_timestamp_delta) = stream.decode_maybe::().await? else { + break; + }; + stream.decode::().await? + } else { + let Some(size) = stream.decode_maybe::().await? else { + break; + }; + size + }; + if size > MAX_FRAME_SIZE { return Err(Error::FrameTooLarge); } diff --git a/rs/moq-net/src/lite/track.rs b/rs/moq-net/src/lite/track.rs new file mode 100644 index 000000000..048acb805 --- /dev/null +++ b/rs/moq-net/src/lite/track.rs @@ -0,0 +1,169 @@ +use std::borrow::Cow; +use std::time::Duration; + +use crate::{ + Path, + coding::{Decode, DecodeError, Encode, EncodeError}, +}; + +use super::{Message, Version}; + +/// Sent by a subscriber on a Track stream (moq-lite-05+) to request a track's +/// immutable publisher properties without subscribing or fetching. +#[derive(Clone, Debug)] +pub struct Track<'a> { + /// The broadcast path of the track. + pub broadcast: Path<'a>, + /// The name of the track within the broadcast. + pub track: Cow<'a, str>, +} + +impl Message for Track<'_> { + fn decode_msg(r: &mut R, version: Version) -> Result { + if !version.has_track_stream() { + return Err(DecodeError::Version); + } + + Ok(Self { + broadcast: Path::decode(r, version)?, + track: Cow::::decode(r, version)?, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { + if !version.has_track_stream() { + return Err(EncodeError::Version); + } + + self.broadcast.encode(w, version)?; + self.track.encode(w, version)?; + Ok(()) + } +} + +/// The publisher's reply on a Track stream (moq-lite-05+): the track's immutable +/// properties. Sent once, then the publisher FINs the stream. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TrackInfo { + /// The publisher's priority for this track, used only to break ties. + pub priority: u8, + /// The publisher's group ordering preference (ascending if true), used only to break ties. + pub ordered: bool, + /// The longest the publisher caches a non-latest group past the arrival of a newer one. + pub max_latency: Duration, + /// Timestamp units per second for this track's frame timestamps. Always non-zero. + pub timescale: u64, +} + +impl Message for TrackInfo { + fn decode_msg(r: &mut R, version: Version) -> Result { + if !version.has_track_stream() { + return Err(DecodeError::Version); + } + + let priority = u8::decode(r, version)?; + let ordered = u8::decode(r, version)? != 0; + let max_latency = Duration::decode(r, version)?; + let timescale = u64::decode(r, version)?; + + // A zero timescale is a protocol violation: every track has a media timeline. + if timescale == 0 { + return Err(DecodeError::InvalidValue); + } + + Ok(Self { + priority, + ordered, + max_latency, + timescale, + }) + } + + fn encode_msg(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { + if !version.has_track_stream() { + return Err(EncodeError::Version); + } + + if self.timescale == 0 { + return Err(EncodeError::InvalidState); + } + + self.priority.encode(w, version)?; + (self.ordered as u8).encode(w, version)?; + self.max_latency.encode(w, version)?; + self.timescale.encode(w, version)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::coding::{Decode, Encode}; + use bytes::BytesMut; + + fn roundtrip(info: &TrackInfo) -> TrackInfo { + let mut buf = BytesMut::new(); + info.encode(&mut buf, Version::Lite05Wip).unwrap(); + let mut slice = &buf[..]; + let decoded = TrackInfo::decode(&mut slice, Version::Lite05Wip).unwrap(); + assert!(bytes::Buf::remaining(&slice) == 0, "trailing bytes after decode"); + decoded + } + + #[test] + fn track_info_roundtrip() { + let info = TrackInfo { + priority: 7, + ordered: true, + max_latency: Duration::from_millis(2500), + timescale: 1000, + }; + assert_eq!(roundtrip(&info), info); + } + + #[test] + fn rejects_zero_timescale() { + let mut buf = BytesMut::new(); + assert!(matches!( + TrackInfo { + priority: 0, + ordered: false, + max_latency: Duration::ZERO, + timescale: 0, + } + .encode(&mut buf, Version::Lite05Wip), + Err(EncodeError::InvalidState) + )); + } + + #[test] + fn track_roundtrip() { + let track = Track { + broadcast: Path::new("room/1"), + track: Cow::Borrowed("video"), + }; + let mut buf = BytesMut::new(); + track.encode(&mut buf, Version::Lite05Wip).unwrap(); + let mut slice = &buf[..]; + let decoded = Track::decode(&mut slice, Version::Lite05Wip).unwrap(); + assert!(bytes::Buf::remaining(&slice) == 0, "trailing bytes after decode"); + assert_eq!(decoded.broadcast, track.broadcast); + assert_eq!(decoded.track, track.track); + } + + #[test] + fn rejects_before_lite05() { + let mut buf = BytesMut::new(); + assert!(matches!( + TrackInfo { + priority: 0, + ordered: false, + max_latency: Duration::ZERO, + timescale: 1000, + } + .encode(&mut buf, Version::Lite04), + Err(EncodeError::Version) + )); + } +} diff --git a/rs/moq-net/src/lite/version.rs b/rs/moq-net/src/lite/version.rs index 73d5f4639..bf6d2f08e 100644 --- a/rs/moq-net/src/lite/version.rs +++ b/rs/moq-net/src/lite/version.rs @@ -19,6 +19,14 @@ impl Version { pub fn has_setup_stream(self) -> bool { matches!(self, Self::Lite05Wip) } + + /// Whether this version uses the moq-lite-05 framing bundle: the Track stream + /// (TRACK/TRACK_INFO), a SUBSCRIBE_OK trimmed to the resolved start group plus a + /// distinct SUBSCRIBE_END, and a per-frame timestamp delta. Earlier versions carry + /// publisher properties inline in SUBSCRIBE_OK and frames without a timestamp. + pub fn has_track_stream(self) -> bool { + matches!(self, Self::Lite05Wip) + } } impl fmt::Display for Version {