Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions js/net/src/connection/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish
setupVersion = Ietf.Version.DRAFT_16;
} else if (protocol === Ietf.ALPN.DRAFT_15) {
setupVersion = Ietf.Version.DRAFT_15;
} else if (protocol === Lite.ALPN_05_WIP) {
// moq-lite draft-05 exchanges SETUP on a dedicated unidirectional stream
// (handled inside Connection), not the session compat stream.
return new Lite.Connection(url, session, Lite.Version.DRAFT_05_WIP, undefined);
} else if (protocol === Lite.ALPN_04) {
// moq-lite draft-04 doesn't use a session stream, so we return immediately.
return new Lite.Connection(url, session, Lite.Version.DRAFT_04, undefined);
Expand Down Expand Up @@ -223,6 +227,8 @@ async function connectTransport(url: URL, session: WebTransport): Promise<Establ
setupVersion = Ietf.Version.DRAFT_16;
} else if (protocol === Ietf.ALPN.DRAFT_15) {
setupVersion = Ietf.Version.DRAFT_15;
} else if (protocol === Lite.ALPN_05_WIP) {
return new Lite.Connection(url, session, Lite.Version.DRAFT_05_WIP, undefined);
} else if (protocol === Lite.ALPN_04) {
return new Lite.Connection(url, session, Lite.Version.DRAFT_04, undefined);
} else if (protocol === Lite.ALPN_03) {
Expand Down
35 changes: 31 additions & 4 deletions js/net/src/lite/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ import { type Bandwidth, createBandwidth } from "../bandwidth.ts";
import type { Broadcast } from "../broadcast.ts";
import type { Established } from "../connection/established.ts";
import * as Path from "../path.ts";
import { type Reader, Readers, Stream } from "../stream.ts";
import { type Reader, Readers, Stream, Writer } from "../stream.ts";
import type * as Time from "../time.ts";
import { AnnounceInterest } from "./announce.ts";
import { Goaway } from "./goaway.ts";
import { Group } from "./group.ts";
import { type Origin, randomOrigin } from "./origin.ts";
import { Publisher } from "./publisher.ts";
import { SessionInfo } from "./session.ts";
import { StreamId } from "./stream.ts";
import { Setup } from "./setup.ts";
import { DataId, StreamId } from "./stream.ts";
import { Subscribe } from "./subscribe.ts";
import { Subscriber } from "./subscriber.ts";
import { Version, versionName } from "./version.ts";
import { Track } from "./track.ts";
import { hasSetupStream, Version, versionName } from "./version.ts";

const SEND_BW_POLL_INTERVAL = 100; // ms

Expand Down Expand Up @@ -114,6 +116,11 @@ export class Connection implements Established {
async #run(): Promise<void> {
const tasks: Promise<void>[] = [this.#runSession(), this.#runBidis(), this.#runUnis()];

// moq-lite-05+: each endpoint opens a Setup stream and sends a single SETUP message.
if (hasSetupStream(this.#version)) {
tasks.push(this.#runSetup());
}

if (this.sendBandwidth) {
tasks.push(this.#runSendBandwidth(this.sendBandwidth));
}
Expand Down Expand Up @@ -184,6 +191,9 @@ export class Connection implements Established {
} else if (typ === StreamId.Subscribe) {
const msg = await Subscribe.decode(stream.reader, this.#version);
await this.#publisher.runSubscribe(msg, stream);
} else if (typ === StreamId.Track) {
const msg = await Track.decode(stream.reader, this.#version);
await this.#publisher.runTrackInfo(msg, stream);
} else if (typ === StreamId.Probe) {
await this.#publisher.runProbe(stream);
} else if (typ === StreamId.Goaway) {
Expand Down Expand Up @@ -213,14 +223,31 @@ export class Connection implements Established {

async #runUni(stream: Reader) {
const typ = await stream.u8();
if (typ === 0) {
if (typ === DataId.Group) {
const msg = await Group.decode(stream);
await this.#subscriber.runGroup(msg, stream);
} else if (typ === DataId.Setup) {
// moq-lite-05+: read the peer's SETUP and discard it (the path, if any,
// is handled at accept time by the server's transport binding).
await Setup.decode(stream, this.#version);
} else {
throw new Error(`unknown stream type: ${typ.toString()}`);
}
}

// moq-lite-05+: open a unidirectional Setup stream, send a single SETUP, and FIN.
// The browser conveys its path via the WebTransport URL, so the SETUP is empty.
async #runSetup(): Promise<void> {
const writer = await Writer.open(this.#quic);
try {
await writer.u8(DataId.Setup);
await new Setup().encode(writer, this.#version);
writer.close();
} catch (err: unknown) {
writer.reset(err);
}
}

async #runSendBandwidth(bandwidth: Bandwidth): Promise<void> {
const quic = this.#quic as unknown as {
getStats: () => Promise<{ estimatedSendRate: number | null }>;
Expand Down
63 changes: 57 additions & 6 deletions js/net/src/lite/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import { Announce, AnnounceInit, type AnnounceInterest } from "./announce.ts";
import { Group as GroupMessage } from "./group.ts";
import type { Origin } from "./origin.ts";
import { Probe } from "./probe.ts";
import { encodeSubscribeResponse, type Subscribe, SubscribeOk, SubscribeUpdate } from "./subscribe.ts";
import { Version } from "./version.ts";
import { encodeSubscribeResponse, type Subscribe, SubscribeEnd, SubscribeOk, SubscribeUpdate } from "./subscribe.ts";
import type { Track as TrackMessage } from "./track.ts";
import { TrackInfo } from "./track.ts";
import { hasTrackStream, Version } from "./version.ts";

const PROBE_INTERVAL = 100; // ms
const PROBE_MAX_AGE = 10_000; // ms
Expand Down Expand Up @@ -170,7 +172,11 @@ export class Publisher {
const track = broadcast.subscribe(msg.track, msg.priority);

try {
const info = new SubscribeOk({ priority: msg.priority });
// moq-lite-05+ trims SUBSCRIBE_OK to the resolved start group; the publisher
// properties move to TRACK_INFO. Earlier versions echo the priority inline.
const info = hasTrackStream(this.version)
? new SubscribeOk({ startGroup: msg.startGroup ?? 0 })
: new SubscribeOk({ priority: msg.priority });
Comment thread
kixelated marked this conversation as resolved.
await encodeSubscribeResponse(stream.writer, { ok: info }, this.version);

console.debug(`publish ok: broadcast=${msg.broadcast} track=${track.name}`);
Expand Down Expand Up @@ -202,6 +208,23 @@ export class Publisher {
}
}

/**
* Serves a Track stream (moq-lite-05+): replies with the track's immutable TRACK_INFO.
*
* @internal
*/
async runTrackInfo(msg: TrackMessage, stream: Stream) {
const broadcast = this.#broadcasts.peek()?.get(msg.broadcast);
if (!broadcast) {
throw new Error("not found");
}

// The publisher properties aren't tracked in the browser model yet; report
// defaults with a millisecond timescale matching the wall-clock frame timestamps.
const info = new TrackInfo({ timescale: 1000 });
await info.encode(stream.writer, this.version);
}

/**
* Runs a track and sends its data to the stream.
* @param sub - The subscription ID
Expand All @@ -213,15 +236,32 @@ export class Publisher {
*/
async #runTrack(sub: bigint, broadcast: Path.Valid, track: Track, stream: Writer) {
try {
let lastSequence: number | undefined;
let trackEnded = false;

for (;;) {
const next = track.recvGroup();
const group = await Promise.race([next, stream.closed]);
if (!group) {
const res = await Promise.race([
next.then((group) => ({ group })),
stream.closed.then(() => "closed" as const),
]);
if (res === "closed") {
next.then((group) => group?.close()).catch(() => {});
break;
}
if (!res.group) {
trackEnded = true;
break;
}

void this.#runGroup(sub, group);
lastSequence = res.group.sequence;
void this.#runGroup(sub, res.group);
}

// moq-lite-05+: signal end-of-track before the subscribe stream FINs.
if (trackEnded && hasTrackStream(this.version)) {
const end = new SubscribeEnd({ group: lastSequence ?? 0 });
await encodeSubscribeResponse(stream, { end }, this.version);
}

console.debug(`publish close: broadcast=${broadcast} track=${track.name}`);
Expand Down Expand Up @@ -250,10 +290,21 @@ export class Publisher {
await msg.encode(stream);

try {
// moq-lite-05+ stamps each frame with a wall-clock millisecond timestamp, sent
// as a zigzag delta from the previous frame (the first frame is a delta from 0).
let prevTimestamp = 0n;

for (;;) {
const frame = await Promise.race([group.readFrame(), stream.closed]);
if (!frame) break;

if (hasTrackStream(this.version)) {
const now = BigInt(Date.now());
const delta = now - prevTimestamp;
prevTimestamp = now;
await stream.u62((delta << 1n) ^ (delta >> 63n));
}

await stream.u53(frame.byteLength);
await stream.write(frame);
}
Expand Down
58 changes: 58 additions & 0 deletions js/net/src/lite/setup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import type { Reader, Writer } from "../stream.ts";
import * as Message from "./message.ts";
import { hasSetupStream, type Version } from "./version.ts";

/// Setup parameter ID for the request Path (client-only, on URI-less transports).
const PARAM_PATH = 0x2;

/**
* The moq-lite-05 SETUP message, sent once per endpoint on a unidirectional Setup stream.
*
* The browser conveys its path via the WebTransport URL, so it never sends a Path
* parameter; this class still parses one for completeness when reading a peer's SETUP.
*/
export class Setup {
/// The request path, only sent on transport bindings without a request URI.
path?: string;

constructor(props: { path?: string } = {}) {
this.path = props.path;
}

async #encode(w: Writer) {
if (this.path !== undefined) {
await w.u53(1); // parameter count
const value = new TextEncoder().encode(this.path);
await w.u53(PARAM_PATH);
await w.u53(value.byteLength);
await w.write(value);
} else {
await w.u53(0); // no parameters
}
}

static async #decode(r: Reader): Promise<Setup> {
const count = await r.u53();
let path: string | undefined;
for (let i = 0; i < count; i++) {
const id = await r.u53();
const len = await r.u53();
const value = await r.read(len);
// Unknown parameters are ignored so new ones stay backward compatible.
if (id === PARAM_PATH) {
path = new TextDecoder().decode(value);
}
}
return new Setup({ path });
}

async encode(w: Writer, version: Version): Promise<void> {
if (!hasSetupStream(version)) throw new Error("SETUP requires moq-lite-05+");
return Message.encode(w, this.#encode.bind(this));
}

static async decode(r: Reader, version: Version): Promise<Setup> {
if (!hasSetupStream(version)) throw new Error("SETUP requires moq-lite-05+");
return Message.decode(r, Setup.#decode);
}
}
9 changes: 9 additions & 0 deletions js/net/src/lite/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ export const StreamId = {
Fetch: 3,
Probe: 4,
Goaway: 5,
/// Queries a track's immutable publisher properties via TRACK/TRACK_INFO (moq-lite-05+).
Track: 6,
ClientCompat: 0x20,
ServerCompat: 0x21,
} as const;

/// Unidirectional (data) stream types.
export const DataId = {
Group: 0,
/// Carries a single SETUP message (moq-lite-05+).
Setup: 1,
} as const;
Loading
Loading