diff --git a/rs/moq-native/src/client.rs b/rs/moq-native/src/client.rs index 05fda2457..8cd8445b5 100644 --- a/rs/moq-native/src/client.rs +++ b/rs/moq-native/src/client.rs @@ -266,6 +266,30 @@ impl Client { Ok(session) } + /// Build the per-connection moq client, advertising the request path in the SETUP + /// for transports that carry no request URI of their own (raw QUIC, qmux over + /// TCP/UDS, raw iroh). WebTransport and WebSocket already convey the path in their + /// own request, so we omit it there to avoid duplicating it. + #[cfg(any( + feature = "noq", + feature = "quinn", + feature = "quiche", + feature = "iroh", + feature = "websocket", + feature = "tcp", + feature = "uds" + ))] + fn connect_client(&self, url: &Url) -> moq_net::Client { + if transport_carries_path(url) { + return self.moq.clone(); + } + + match request_path(url) { + Some(path) => self.moq.clone().with_path(path), + None => self.moq.clone(), + } + } + #[cfg(any( feature = "noq", feature = "quinn", @@ -276,12 +300,16 @@ impl Client { feature = "uds" ))] async fn connect_inner(&self, url: Url) -> crate::Result { + // Advertise the request path in the moq SETUP for transports that carry no + // request URI of their own; WebTransport and WebSocket already convey it. + let moq = self.connect_client(&url); + // Plain TCP (qmux, no TLS). Explicit opt-in scheme; never raced against // QUIC, which can't speak it. Use only on a trusted network. #[cfg(feature = "tcp")] if url.scheme() == "tcp" { let session = crate::tcp::connect(url, &self.versions.alpns()).await?; - return Ok(self.moq.connect(session).await?); + return Ok(moq.connect(session).await?); } // Unix domain socket (qmux, no TLS). Same-host only; the server can @@ -289,14 +317,14 @@ impl Client { #[cfg(all(feature = "uds", unix))] if url.scheme() == "unix" { let session = crate::unix::connect(url, &self.versions.alpns()).await?; - return Ok(self.moq.connect(session).await?); + return Ok(moq.connect(session).await?); } #[cfg(feature = "iroh")] if url.scheme() == "iroh" { let endpoint = self.iroh.as_ref().ok_or(Error::IrohDisabled)?; let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?; - let session = self.moq.connect(session).await?; + let session = moq.connect(session).await?; return Ok(session); } @@ -308,13 +336,13 @@ impl Client { #[cfg(feature = "websocket")] { - return self.race_moq_connect(url, quic_handle).await; + return self.race_moq_connect(&moq, url, quic_handle).await; } #[cfg(not(feature = "websocket"))] { let session = quic_handle.await?; - return Ok(self.moq.connect(session).await?); + return Ok(moq.connect(session).await?); } } @@ -326,13 +354,13 @@ impl Client { #[cfg(feature = "websocket")] { - return self.race_moq_connect(url, quic_handle).await; + return self.race_moq_connect(&moq, url, quic_handle).await; } #[cfg(not(feature = "websocket"))] { let session = quic_handle.await?; - return Ok(self.moq.connect(session).await?); + return Ok(moq.connect(session).await?); } } @@ -343,13 +371,13 @@ impl Client { #[cfg(feature = "websocket")] { - return self.race_moq_connect(url, quic_handle).await; + return self.race_moq_connect(&moq, url, quic_handle).await; } #[cfg(not(feature = "websocket"))] { let session = quic_handle.await?; - return Ok(self.moq.connect(session).await?); + return Ok(moq.connect(session).await?); } } @@ -357,7 +385,7 @@ impl Client { { let alpns = self.versions.alpns(); let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?; - return Ok(self.moq.connect(session).await?); + return Ok(moq.connect(session).await?); } #[cfg(not(feature = "websocket"))] @@ -365,7 +393,7 @@ impl Client { } #[cfg(feature = "websocket")] - async fn race_moq_connect(&self, url: Url, quic: Q) -> crate::Result + async fn race_moq_connect(&self, moq: &moq_net::Client, url: Url, quic: Q) -> crate::Result where Q: Future>, S: web_transport_trait::Session, @@ -380,12 +408,57 @@ impl Client { }; match race_transport_connect(quic, websocket).await? { - TransportRace::Quic(quic) => Ok(self.moq.connect(quic).await?), - TransportRace::WebSocket(websocket) => Ok(self.moq.connect(websocket).await?), + TransportRace::Quic(quic) => Ok(moq.connect(quic).await?), + TransportRace::WebSocket(websocket) => Ok(moq.connect(websocket).await?), } } } +/// Whether the transport for this URL always conveys the request path itself. +/// +/// WebTransport (`https`/`http`) and WebSocket (`ws`/`wss`) carry the path in their +/// request, so it must not be duplicated in the moq SETUP. Everything else advertises +/// it in the SETUP: `moqt`/`moql` raw QUIC and qmux over `tcp`/`unix` have no request +/// URI, and `iroh` only carries one in its HTTP/3 mode (a raw iroh session does not), +/// so iroh always sends it in band to be safe. +#[cfg(any( + feature = "noq", + feature = "quinn", + feature = "quiche", + feature = "iroh", + feature = "websocket", + feature = "tcp", + feature = "uds" +))] +fn transport_carries_path(url: &Url) -> bool { + matches!(url.scheme(), "https" | "http" | "ws" | "wss") +} + +/// The request path to advertise in the moq SETUP, derived from the dial URL. +/// +/// A `?path=` query overrides everything; it is the only way to set a path on a +/// `unix://` URL, whose URL path is the socket file rather than a namespace. Other +/// schemes (`tcp`, raw QUIC, `iroh`) use the URL path component. Returns `None` for a +/// `unix://` URL with no `?path=`, leaving the namespace at the root. +#[cfg(any( + feature = "noq", + feature = "quinn", + feature = "quiche", + feature = "iroh", + feature = "websocket", + feature = "tcp", + feature = "uds" +))] +fn request_path(url: &Url) -> Option { + if let Some((_, path)) = url.query_pairs().find(|(key, _)| key == "path") { + return Some(path.into_owned()); + } + match url.scheme() { + "unix" => None, + _ => Some(url.path().to_string()), + } +} + #[cfg(feature = "websocket")] #[derive(Debug, PartialEq, Eq)] enum TransportRace { @@ -457,6 +530,71 @@ mod tests { use super::*; use clap::Parser; + #[cfg(any( + feature = "noq", + feature = "quinn", + feature = "quiche", + feature = "iroh", + feature = "websocket", + feature = "tcp", + feature = "uds" + ))] + #[test] + fn classifies_transport_path() { + for url in ["https://h/p", "http://h/p", "wss://h/p", "ws://h/p"] { + assert!( + transport_carries_path(&Url::parse(url).unwrap()), + "{url} carries its own path" + ); + } + // iroh is URL-less here: its raw mode carries no request URI, so it always + // advertises the path in the SETUP. + for url in [ + "tcp://h:1/p", + "unix:///run/s.sock", + "moqt://h/p", + "moql://h/p", + "iroh://node/p", + ] { + assert!( + !transport_carries_path(&Url::parse(url).unwrap()), + "{url} advertises in SETUP" + ); + } + } + + #[cfg(any( + feature = "noq", + feature = "quinn", + feature = "quiche", + feature = "iroh", + feature = "websocket", + feature = "tcp", + feature = "uds" + ))] + #[test] + fn request_path_from_url() { + // Schemes with a free path component use it directly. + for url in ["tcp://h:1/anycast", "moqt://h/anycast", "iroh://node/anycast"] { + assert_eq!( + request_path(&Url::parse(url).unwrap()).as_deref(), + Some("/anycast"), + "{url}" + ); + } + // A unix:// URL path is the socket file, so it has no namespace by default. + assert_eq!(request_path(&Url::parse("unix:///run/s.sock").unwrap()), None); + // ...but a ?path= query supplies one, leaving the socket path intact. + let uds = Url::parse("unix:///run/s.sock?path=/anycast").unwrap(); + assert_eq!(uds.path(), "/run/s.sock"); + assert_eq!(request_path(&uds).as_deref(), Some("/anycast")); + // ?path= overrides the URL path on any scheme. + assert_eq!( + request_path(&Url::parse("tcp://h:1/ignored?path=/win").unwrap()).as_deref(), + Some("/win") + ); + } + #[test] fn test_toml_disable_verify_survives_update_from() { let toml = r#" diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 0e6fd39f5..3d1c19c68 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -393,7 +393,7 @@ impl Server { Ok(session) => { return Some(Request { server, - kind: RequestKind::WebSocket(session), + kind: RequestKind::WebSocket(Box::new(session)), }); } Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"), @@ -479,7 +479,7 @@ pub(crate) enum RequestKind { #[cfg(feature = "iroh")] Iroh(crate::iroh::Request), #[cfg(feature = "websocket")] - WebSocket(qmux::Session), + WebSocket(Box), } /// An incoming MoQ session that can be accepted or rejected. @@ -573,7 +573,7 @@ impl Request { .accept(request.ok().await.map_err(crate::iroh::Error::Server)?) .await?), #[cfg(feature = "websocket")] - RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?), + RequestKind::WebSocket(session) => Ok(self.server.accept(*session).await?), } } diff --git a/rs/moq-net/src/client.rs b/rs/moq-net/src/client.rs index 204289c51..6db9e2840 100644 --- a/rs/moq-net/src/client.rs +++ b/rs/moq-net/src/client.rs @@ -12,6 +12,7 @@ pub struct Client { consume: Option, stats: StatsHandle, versions: Versions, + path: Option, } impl Client { @@ -50,6 +51,25 @@ impl Client { self } + /// Set the request path to advertise in the SETUP (moq-lite-05). + /// + /// Required on transports that carry no request URI (native QUIC, qmux over + /// TCP/TLS/UDS) so the server learns which path the client wants. Omit it on + /// bindings that already carry a URI (WebTransport). Ignored by versions with no + /// Setup stream (moq-lite-01 through 04). The value is normalized to an absolute + /// path (empty becomes `/`, a leading `/` is prepended). + pub fn with_path(mut self, path: impl Into) -> Self { + let path = path.into(); + self.path = Some(if path.is_empty() { + "/".to_string() + } else if path.starts_with('/') { + path + } else { + format!("/{path}") + }); + self + } + /// Perform the MoQ handshake as a client negotiating the version. pub async fn connect(&self, session: S) -> Result { if self.publish.is_none() && self.consume.is_none() { @@ -127,6 +147,9 @@ impl Client { .select(Version::Lite(lite::Version::Lite05Wip)) .ok_or(Error::Version)?; + let setup = lite::Setup { + path: self.path.clone(), + }; let recv_bw = lite::start( session.clone(), None, @@ -134,6 +157,7 @@ impl Client { self.consume.clone(), self.stats.clone(), lite::Version::Lite05Wip, + setup, )?; return Ok(Session::new(session, lite::Version::Lite05Wip.into(), recv_bw)); @@ -150,6 +174,7 @@ impl Client { self.consume.clone(), self.stats.clone(), lite::Version::Lite04, + lite::Setup::default(), )?; return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw)); @@ -167,6 +192,7 @@ impl Client { self.consume.clone(), self.stats.clone(), lite::Version::Lite03, + lite::Setup::default(), )?; return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw)); @@ -206,6 +232,7 @@ impl Client { let recv_bw = match version { Version::Lite(v) => { let stream = stream.with_version(v); + // This path only negotiates lite-01/02, which have no Setup stream. lite::start( session.clone(), Some(stream), @@ -213,6 +240,7 @@ impl Client { self.consume.clone(), self.stats.clone(), v, + lite::Setup::default(), )? } Version::Ietf(v) => { diff --git a/rs/moq-net/src/lite/mod.rs b/rs/moq-net/src/lite/mod.rs index e155af914..46b694532 100644 --- a/rs/moq-net/src/lite/mod.rs +++ b/rs/moq-net/src/lite/mod.rs @@ -15,6 +15,7 @@ mod priority; mod probe; mod publisher; mod session; +mod setup; mod stream; mod subscribe; mod subscriber; @@ -32,6 +33,8 @@ pub use parameters::*; pub use probe::*; use publisher::*; pub(super) use session::*; +pub use setup::Setup; +pub(super) use setup::{accept_setup, send_setup}; pub use stream::*; pub use subscribe::*; use subscriber::*; diff --git a/rs/moq-net/src/lite/parameters.rs b/rs/moq-net/src/lite/parameters.rs index 1236da79d..f44fe01fe 100644 --- a/rs/moq-net/src/lite/parameters.rs +++ b/rs/moq-net/src/lite/parameters.rs @@ -9,6 +9,18 @@ const MAX_PARAMS: u64 = 64; #[derive(Default, Debug, Clone)] pub struct Parameters(HashMap>); +impl Parameters { + /// Look up a raw parameter value by ID. + pub fn get(&self, id: u64) -> Option<&[u8]> { + self.0.get(&id).map(Vec::as_slice) + } + + /// Set a raw parameter value by ID, replacing any existing one. + pub fn set(&mut self, id: u64, value: Vec) { + self.0.insert(id, value); + } +} + impl Decode for Parameters { fn decode(mut r: &mut R, version: Version) -> Result { let mut map = HashMap::new(); diff --git a/rs/moq-net/src/lite/session.rs b/rs/moq-net/src/lite/session.rs index 59226523b..7f19dfdd4 100644 --- a/rs/moq-net/src/lite/session.rs +++ b/rs/moq-net/src/lite/session.rs @@ -3,7 +3,7 @@ use crate::{ lite::SessionInfo, }; -use super::{Publisher, PublisherConfig, Subscriber, SubscriberConfig, Version}; +use super::{Publisher, PublisherConfig, Setup, Subscriber, SubscriberConfig, Version, send_setup}; pub fn start( session: S, // The stream used to setup the session, after exchanging setup messages. @@ -17,6 +17,9 @@ pub fn start( stats: StatsHandle, // The version of the protocol to use. version: Version, + // The SETUP message to advertise on the Setup stream (moq-lite-05+). Ignored on + // earlier versions, which have no Setup stream. + our_setup: Setup, ) -> Result, Error> { let recv_bw = BandwidthProducer::new(); @@ -49,6 +52,20 @@ pub fn start( version, }); + // moq-lite-05 reintroduced a Setup stream: each endpoint opens one and sends a + // single SETUP message advertising its optional capabilities. + if version.has_setup_stream() { + let session = session.clone(); + web_async::spawn(async move { + if let Err(err) = send_setup(&session, version, our_setup).await { + // The peer gates serving on our SETUP, so a failure to send it must + // tear the session down rather than leave the peer waiting. + tracing::warn!(%err, "failed to send setup stream"); + session.close(err.to_code(), &err.to_string()); + } + }); + } + web_async::spawn(async move { let res = tokio::select! { Err(res) = run_session(setup) => Err(res), diff --git a/rs/moq-net/src/lite/setup.rs b/rs/moq-net/src/lite/setup.rs new file mode 100644 index 000000000..d7c0cc7b4 --- /dev/null +++ b/rs/moq-net/src/lite/setup.rs @@ -0,0 +1,185 @@ +//! The moq-lite-05 SETUP message and its unidirectional Setup stream. +//! +//! Each endpoint opens one Setup stream at the start of the session, sends a single +//! SETUP message advertising its optional capabilities, and closes it (FIN). Unknown +//! parameters are ignored so new ones stay backward compatible. + +use crate::{Error, coding::*}; + +use super::{DataType, Message, Parameters, Version}; + +/// Setup parameter ID for the request Path (client-only, on URI-less transports). +const PARAM_PATH: u64 = 0x2; + +/// The SETUP message, sent once per endpoint on a Setup stream (moq-lite-05+). +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct Setup { + /// The request path, sent by a client on a transport binding that carries no + /// request URI (native QUIC, qmux over TCP/TLS/UDS). When present it begins with + /// `/`. A server never sends one, and it is absent on URI-carrying bindings + /// (WebTransport), which already convey the path. + pub path: Option, +} + +impl Message for Setup { + fn encode_msg(&self, w: &mut W, version: Version) -> Result<(), EncodeError> { + if !version.has_setup_stream() { + return Err(EncodeError::Version); + } + + let mut params = Parameters::default(); + if let Some(path) = &self.path { + // The path must be an absolute URI path; reject malformed values rather + // than emitting something a peer must close the session over. + if !path.starts_with('/') { + return Err(EncodeError::InvalidState); + } + params.set(PARAM_PATH, path.clone().into_bytes()); + } + + params.encode(w, version) + } + + fn decode_msg(r: &mut R, version: Version) -> Result { + if !version.has_setup_stream() { + return Err(DecodeError::Version); + } + + let params = Parameters::decode(r, version)?; + + let path = match params.get(PARAM_PATH) { + Some(bytes) => { + let s = std::str::from_utf8(bytes).map_err(|_| DecodeError::InvalidValue)?; + // Must be an absolute URI path; URL-carrying transports can never + // produce anything else, and the relay scopes auth from it. + if !s.starts_with('/') { + return Err(DecodeError::InvalidValue); + } + Some(s.to_string()) + } + None => None, + }; + + Ok(Self { path }) + } +} + +/// Open a Setup stream, send a single SETUP message, and close it (FIN). +/// +/// Called once on connect; each endpoint opens exactly one Setup stream. +pub(crate) async fn send_setup( + session: &S, + version: Version, + setup: Setup, +) -> Result<(), Error> { + let stream = session.open_uni().await.map_err(Error::from_transport)?; + + let mut writer = Writer::new(stream, version); + writer.encode(&DataType::Setup).await?; + writer.encode(&setup).await?; + writer.finish()?; + + Ok(()) +} + +/// Read the peer's SETUP off its Setup stream, returning once it arrives. +/// +/// Used on the server to learn the client's request path before deciding what to +/// serve. Any data stream (e.g. a Group) that races ahead of the Setup stream is +/// reset and skipped; the peer sends exactly one SETUP, so this resolves quickly. +pub(crate) async fn accept_setup( + session: &S, + version: Version, +) -> Result { + loop { + let recv = session.accept_uni().await.map_err(Error::from_transport)?; + let mut reader = Reader::new(recv, version); + + match reader.decode::().await? { + DataType::Setup => return reader.decode().await, + DataType::Group => reader.abort(&Error::Cancel), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::BytesMut; + + fn roundtrip(setup: &Setup) -> Setup { + let mut buf = BytesMut::new(); + setup.encode(&mut buf, Version::Lite05Wip).unwrap(); + let mut slice = &buf[..]; + let decoded = Setup::decode(&mut slice, Version::Lite05Wip).unwrap(); + assert!(bytes::Buf::remaining(&slice) == 0, "trailing bytes after decode"); + decoded + } + + #[test] + fn empty() { + assert_eq!(roundtrip(&Setup::default()), Setup::default()); + } + + #[test] + fn with_path() { + let setup = Setup { + path: Some("/live/room".to_string()), + }; + assert_eq!(roundtrip(&setup), setup); + } + + #[test] + fn rejects_relative_path() { + // Encoding a non-absolute path fails rather than emitting bad wire bytes. + let mut buf = BytesMut::new(); + assert!(matches!( + Setup { + path: Some("foo".into()) + } + .encode(&mut buf, Version::Lite05Wip), + Err(EncodeError::InvalidState) + )); + + // Decoding a hand-rolled relative path is rejected at the wire boundary. + let mut params = Parameters::default(); + params.set(PARAM_PATH, b"foo".to_vec()); + let mut body = BytesMut::new(); + params.encode(&mut body, Version::Lite05Wip).unwrap(); + let mut framed = BytesMut::new(); + (body.len() as u64).encode(&mut framed, Version::Lite05Wip).unwrap(); + framed.extend_from_slice(&body); + assert!(matches!( + Setup::decode(&mut framed, Version::Lite05Wip), + Err(DecodeError::InvalidValue) + )); + } + + #[test] + fn rejects_before_lite05() { + let mut buf = BytesMut::new(); + assert!(matches!( + Setup::default().encode(&mut buf, Version::Lite04), + Err(EncodeError::Version) + )); + } + + #[test] + fn ignores_unknown_parameters() { + // Encode a SETUP carrying an unknown parameter ID alongside the path. + let mut params = Parameters::default(); + params.set(PARAM_PATH, b"/foo".to_vec()); + params.set(0xbeef, b"whatever".to_vec()); + + let mut body = BytesMut::new(); + params.encode(&mut body, Version::Lite05Wip).unwrap(); + + // Wrap with the message size prefix the Message impl expects. + let mut buf = BytesMut::new(); + (body.len() as u64).encode(&mut buf, Version::Lite05Wip).unwrap(); + buf.extend_from_slice(&body); + + let decoded = Setup::decode(&mut buf, Version::Lite05Wip).unwrap(); + assert_eq!(decoded.path.as_deref(), Some("/foo")); + } +} diff --git a/rs/moq-net/src/lite/stream.rs b/rs/moq-net/src/lite/stream.rs index d52a77943..22a5cc539 100644 --- a/rs/moq-net/src/lite/stream.rs +++ b/rs/moq-net/src/lite/stream.rs @@ -34,6 +34,8 @@ impl Encode for ControlType { #[repr(u64)] pub enum DataType { Group = 0, + /// Carries a single SETUP message (moq-lite-05+). + Setup = 1, } impl Decode for DataType { diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index 712f38f02..9ce7bbd03 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -115,6 +115,7 @@ impl Subscriber { let res = match kind { lite::DataType::Group => self.recv_group(&mut stream).await, + lite::DataType::Setup => self.recv_setup(&mut stream).await, }; if let Err(err) = res { @@ -459,6 +460,20 @@ impl Subscriber { Ok(()) } + async fn recv_setup(&self, stream: &mut Reader) -> Result<(), Error> { + // The Setup stream only exists in moq-lite-05+; reject it on older versions. + if self.version != Version::Lite05Wip { + return Err(Error::UnexpectedStream); + } + + let setup: lite::Setup = stream.decode().await?; + tracing::debug!(?setup, "received setup"); + + // TODO: surface the negotiated capabilities (path, probe) to the session. + + Ok(()) + } + pub async fn recv_group(&mut self, stream: &mut Reader) -> Result<(), Error> { let hdr: lite::Group = stream.decode().await?; diff --git a/rs/moq-net/src/lite/version.rs b/rs/moq-net/src/lite/version.rs index 791e86c12..73d5f4639 100644 --- a/rs/moq-net/src/lite/version.rs +++ b/rs/moq-net/src/lite/version.rs @@ -12,6 +12,15 @@ pub enum Version { Lite05Wip, } +impl Version { + /// Whether this version uses a unidirectional Setup stream (moq-lite-05+). + /// + /// Earlier versions negotiate purely via ALPN and exchange no SETUP message. + pub fn has_setup_stream(self) -> bool { + matches!(self, Self::Lite05Wip) + } +} + impl fmt::Display for Version { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { diff --git a/rs/moq-net/src/server.rs b/rs/moq-net/src/server.rs index 2ae855daa..ac7bb4622 100644 --- a/rs/moq-net/src/server.rs +++ b/rs/moq-net/src/server.rs @@ -50,54 +50,51 @@ impl Server { self } - /// Perform the MoQ handshake as a server for the given session. + /// Perform the MoQ handshake as a server, returning the established [`Session`]. + /// + /// Convenience wrapper over [`accept_request`](Self::accept_request) that completes + /// the handshake immediately. Use `accept_request` when you need to inspect the + /// client's advertised path before deciding what to serve. pub async fn accept(&self, session: S) -> Result { - if self.publish.is_none() && self.consume.is_none() { - tracing::warn!("not publishing or consuming anything"); - } + self.accept_request(session).await?.ok().await + } + + /// Begin the MoQ handshake, pausing once the client's request path is known so the + /// caller can authorize or scope before serving. + /// + /// Reads the client's SETUP (the in-band path lives there on URL-less transports), + /// then returns a [`Request`]: inspect [`path`](Request::path), set the origins to + /// serve, and call [`ok`](Request::ok) or [`close`](Request::close). Starting the + /// session is deferred to `ok()`, so origins set on the `Request` take effect. + /// + /// The path is surfaced for moq-lite-05; it is `None` on versions with no in-band + /// request path (lite 01-04, and the IETF drafts in this build). + pub async fn accept_request(&self, session: S) -> Result, Error> { + // Regimes without an in-band path defer to `ok()` without surfacing one. + let deferred = |handshake| Request { + server: self.clone(), + path: None, + handshake, + }; let (encoding, supported) = match session.protocol() { Some(ALPN_18) => { - let v = self - .versions + self.versions .select(Version::Ietf(ietf::Version::Draft18)) .ok_or(Error::Version)?; - - // Draft-17+: SETUP is exchanged in the background by the session. - ietf::start( - session.clone(), - None, - None, - false, - self.publish.clone(), - self.consume.clone(), - self.stats.clone(), - ietf::Version::Draft18, - )?; - - tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None)); + return Ok(deferred(Handshake::IetfModern { + session, + version: ietf::Version::Draft18, + })); } Some(ALPN_17) => { - let v = self - .versions + self.versions .select(Version::Ietf(ietf::Version::Draft17)) .ok_or(Error::Version)?; - - // Draft-17+: SETUP is exchanged in the background by the session. - ietf::start( - session.clone(), - None, - None, - false, - self.publish.clone(), - self.consume.clone(), - self.stats.clone(), - ietf::Version::Draft17, - )?; - - tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None)); + return Ok(deferred(Handshake::IetfModern { + session, + version: ietf::Version::Draft17, + })); } Some(ALPN_16) => { let v = self @@ -125,49 +122,32 @@ impl Server { .select(Version::Lite(lite::Version::Lite05Wip)) .ok_or(Error::Version)?; - let recv_bw = lite::start( - session.clone(), - None, - self.publish.clone(), - self.consume.clone(), - self.stats.clone(), - lite::Version::Lite05Wip, - )?; - - return Ok(Session::new(session, lite::Version::Lite05Wip.into(), recv_bw)); + // Gate on the client's SETUP: read it before serving so the caller can + // scope by the advertised path. + let client_setup = lite::accept_setup(&session, lite::Version::Lite05Wip).await?; + return Ok(Request { + server: self.clone(), + path: client_setup.path, + handshake: Handshake::Lite05 { session }, + }); } Some(ALPN_LITE_04) => { self.versions .select(Version::Lite(lite::Version::Lite04)) .ok_or(Error::Version)?; - - let recv_bw = lite::start( - session.clone(), - None, - self.publish.clone(), - self.consume.clone(), - self.stats.clone(), - lite::Version::Lite04, - )?; - - return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw)); + return Ok(deferred(Handshake::LiteBare { + session, + version: lite::Version::Lite04, + })); } Some(ALPN_LITE_03) => { self.versions .select(Version::Lite(lite::Version::Lite03)) .ok_or(Error::Version)?; - - // Starting with draft-03, there's no more SETUP control stream. - let recv_bw = lite::start( - session.clone(), - None, - self.publish.clone(), - self.consume.clone(), - self.stats.clone(), - lite::Version::Lite03, - )?; - - return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw)); + return Ok(deferred(Handshake::LiteBare { + session, + version: lite::Version::Lite03, + })); } Some(ALPN_LITE) | None => { let supported = self.versions.filter(&NEGOTIATED.into()).ok_or(Error::Version)?; @@ -176,11 +156,11 @@ impl Server { Some(p) => return Err(Error::UnknownAlpn(p.to_string())), }; + // Legacy bidi SETUP exchange (IETF 14-16, lite 01/02). Read the client's SETUP + // to choose the version; `ok()` sends the server SETUP and starts the session. let mut stream = Stream::accept(&session, encoding).await?; - let mut client: setup::Client = stream.reader.decode().await?; - // Choose the version to use let version = client .versions .iter() @@ -188,6 +168,144 @@ impl Server { .find(|v| supported.contains(v)) .ok_or(Error::Version)?; + // Pull the max request ID out now (IETF only) so `ok()` doesn't re-decode the + // consumed parameters. + let request_id_max = match version { + Version::Ietf(v) => { + let params = ietf::Parameters::decode(&mut client.parameters, v)?; + params + .get_varint(ietf::ParameterVarInt::MaxRequestId) + .map(ietf::RequestId) + } + Version::Lite(_) => None, + }; + + Ok(deferred(Handshake::Legacy { + session, + stream, + version, + request_id_max, + })) + } +} + +/// A paused server-side handshake. +/// +/// Returned by [`Server::accept_request`] once the client's advertised +/// [`path`](Self::path) is known but before the session is granted anything. Set the +/// origins to serve, then call [`ok`](Self::ok) to complete the handshake, or +/// [`close`](Self::close) to reject it. Modeled on the WebTransport `Request`. +pub struct Request { + server: Server, + path: Option, + handshake: Handshake, +} + +/// The handshake state captured at the pause point. Every variant defers its session +/// start to [`Request::ok`] so origins set on the `Request` still apply. +enum Handshake { + /// Modern IETF (17/18): SETUP is exchanged in the background by the session. + IetfModern { session: S, version: ietf::Version }, + /// moq-lite 03/04: no Setup stream. + LiteBare { session: S, version: lite::Version }, + /// moq-lite 05+: the client's Setup stream has already been read. + Lite05 { session: S }, + /// Legacy IETF (14-16) and lite 01/02: the client SETUP has been read off the bidi + /// stream but the server SETUP hasn't been sent. `ok()` finishes it. + Legacy { + session: S, + stream: Stream, + version: Version, + request_id_max: Option, + }, +} + +impl Request { + /// The request path the client advertised in its SETUP, if any. + /// + /// Populated for moq-lite-05; `None` on versions without an in-band request path. + /// See the note on [`Server::accept_request`]. + pub fn path(&self) -> Option<&str> { + self.path.as_deref() + } + + /// Publish to the connected client. Overrides any value from the [`Server`] + /// builder; typically set after inspecting [`path`](Self::path). + pub fn with_publish(mut self, publish: impl Into>) -> Self { + self.server.publish = publish.into(); + self + } + + /// Subscribe to the connected client. Overrides any value from the [`Server`] builder. + pub fn with_consume(mut self, consume: impl Into>) -> Self { + self.server.consume = consume.into(); + self + } + + /// Set the tier-scoped stats handle. Overrides any value from the [`Server`] builder. + pub fn with_stats(mut self, stats: StatsHandle) -> Self { + self.server.stats = stats; + self + } + + /// Accept the session, completing the handshake. + pub async fn ok(self) -> Result { + let server = self.server; + + // Warn here, not in `accept_request`: callers attach origins on the Request + // (after inspecting the path), so checking earlier gives false positives. + if server.publish.is_none() && server.consume.is_none() { + tracing::warn!("not publishing or consuming anything"); + } + + let (session, mut stream, version, request_id_max) = match self.handshake { + Handshake::IetfModern { session, version } => { + ietf::start( + session.clone(), + None, + None, + false, + server.publish, + server.consume, + server.stats, + version, + )?; + tracing::debug!(?version, "connected"); + return Ok(Session::new(session, version.into(), None)); + } + Handshake::LiteBare { session, version } => { + let recv_bw = lite::start( + session.clone(), + None, + server.publish, + server.consume, + server.stats, + version, + lite::Setup::default(), + )?; + return Ok(Session::new(session, version.into(), recv_bw)); + } + Handshake::Lite05 { session } => { + // A server never advertises a request path. + let recv_bw = lite::start( + session.clone(), + None, + server.publish, + server.consume, + server.stats, + lite::Version::Lite05Wip, + lite::Setup::default(), + )?; + return Ok(Session::new(session, lite::Version::Lite05Wip.into(), recv_bw)); + } + Handshake::Legacy { + session, + stream, + version, + request_id_max, + } => (session, stream, version, request_id_max), + }; + // Encode parameters using the version-appropriate type. let parameters = match version { Version::Ietf(v) => { @@ -199,40 +317,36 @@ impl Server { Version::Lite(v) => lite::Parameters::default().encode_bytes(v)?, }; - let server = setup::Server { + let server_setup = setup::Server { version: version.into(), parameters, }; - stream.writer.encode(&server).await?; + stream.writer.encode(&server_setup).await?; let recv_bw = match version { Version::Lite(v) => { let stream = stream.with_version(v); + // Pre-lite-05: no Setup stream, so nothing to advertise. lite::start( session.clone(), Some(stream), - self.publish.clone(), - self.consume.clone(), - self.stats.clone(), + server.publish, + server.consume, + server.stats, v, + lite::Setup::default(), )? } Version::Ietf(v) => { - // Decode the client's parameters to get their max request ID. - let parameters = ietf::Parameters::decode(&mut client.parameters, v)?; - let request_id_max = parameters - .get_varint(ietf::ParameterVarInt::MaxRequestId) - .map(ietf::RequestId); - let stream = stream.with_version(v); ietf::start( session.clone(), Some(stream), request_id_max, false, - self.publish.clone(), - self.consume.clone(), - self.stats.clone(), + server.publish, + server.consume, + server.stats, v, )?; None @@ -241,4 +355,192 @@ impl Server { Ok(Session::new(session, version, recv_bw)) } + + /// Reject the session, closing the transport with `err`'s wire code. + pub fn close(self, err: Error) { + let session = match self.handshake { + Handshake::IetfModern { session, .. } => session, + Handshake::LiteBare { session, .. } => session, + Handshake::Lite05 { session } => session, + Handshake::Legacy { session, .. } => session, + }; + session.close(err.to_code(), &err.to_string()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, + }; + + use bytes::Bytes; + + #[derive(Debug, Clone, Default)] + struct FakeError; + impl std::fmt::Display for FakeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "fake transport error") + } + } + impl std::error::Error for FakeError {} + impl web_transport_trait::Error for FakeError { + fn session_error(&self) -> Option<(u32, String)> { + Some((0, "closed".to_string())) + } + } + + /// A session that replays a queue of unidirectional streams (each a `Vec`) in + /// order from `accept_uni`; everything else is inert. + #[derive(Clone)] + struct FakeSession { + protocol: Option<&'static str>, + uni: Arc>>>, + } + + impl FakeSession { + fn new(protocol: &'static str, uni: impl IntoIterator>) -> Self { + Self { + protocol: Some(protocol), + uni: Arc::new(Mutex::new(uni.into_iter().collect())), + } + } + } + + impl web_transport_trait::Session for FakeSession { + type SendStream = FakeSend; + type RecvStream = FakeRecv; + type Error = FakeError; + + async fn accept_uni(&self) -> Result { + // Drop the guard before any await so the future stays Send. + let data = self.uni.lock().unwrap().pop_front(); + match data { + Some(data) => Ok(FakeRecv { data: data.into() }), + None => std::future::pending().await, + } + } + async fn accept_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> { + std::future::pending().await + } + async fn open_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::Error> { + std::future::pending().await + } + async fn open_uni(&self) -> Result { + std::future::pending().await + } + fn send_datagram(&self, _payload: Bytes) -> Result<(), Self::Error> { + Ok(()) + } + async fn recv_datagram(&self) -> Result { + std::future::pending().await + } + fn max_datagram_size(&self) -> usize { + 1200 + } + fn protocol(&self) -> Option<&str> { + self.protocol + } + fn close(&self, _code: u32, _reason: &str) {} + async fn closed(&self) -> Self::Error { + std::future::pending().await + } + } + + #[derive(Clone, Default)] + struct FakeSend; + impl web_transport_trait::SendStream for FakeSend { + type Error = FakeError; + async fn write(&mut self, buf: &[u8]) -> Result { + Ok(buf.len()) + } + fn set_priority(&mut self, _order: u8) {} + fn finish(&mut self) -> Result<(), Self::Error> { + Ok(()) + } + fn reset(&mut self, _code: u32) {} + async fn closed(&mut self) -> Result<(), Self::Error> { + Ok(()) + } + } + + struct FakeRecv { + data: VecDeque, + } + impl web_transport_trait::RecvStream for FakeRecv { + type Error = FakeError; + async fn read(&mut self, dst: &mut [u8]) -> Result, Self::Error> { + if self.data.is_empty() { + return Ok(None); + } + let size = dst.len().min(self.data.len()); + for slot in dst.iter_mut().take(size) { + *slot = self.data.pop_front().unwrap(); + } + Ok(Some(size)) + } + fn stop(&mut self, _code: u32) {} + async fn closed(&mut self) -> Result<(), Self::Error> { + Ok(()) + } + } + + /// Encode a lite-05 Setup stream: the `DataType::Setup` tag then the SETUP message. + fn lite05_setup(path: Option<&str>) -> Vec { + let v = lite::Version::Lite05Wip; + let mut buf = Vec::new(); + lite::DataType::Setup.encode(&mut buf, v).unwrap(); + lite::Setup { + path: path.map(str::to_string), + } + .encode(&mut buf, v) + .unwrap(); + buf + } + + /// Encode a lite-05 Group uni stream header (just the `DataType::Group` tag). + fn lite05_group() -> Vec { + let mut buf = Vec::new(); + lite::DataType::Group + .encode(&mut buf, lite::Version::Lite05Wip) + .unwrap(); + buf + } + + #[tokio::test(start_paused = true)] + async fn accept_request_reads_lite05_path() { + let session = FakeSession::new(ALPN_LITE_05_WIP, [lite05_setup(Some("/team/room"))]); + let request = Server::new() + .with_versions(Version::Lite(lite::Version::Lite05Wip).into()) + .accept_request(session) + .await + .unwrap(); + assert_eq!(request.path(), Some("/team/room")); + } + + #[tokio::test(start_paused = true)] + async fn accept_request_lite05_without_path_is_none() { + let session = FakeSession::new(ALPN_LITE_05_WIP, [lite05_setup(None)]); + let request = Server::new() + .with_versions(Version::Lite(lite::Version::Lite05Wip).into()) + .accept_request(session) + .await + .unwrap(); + assert_eq!(request.path(), None); + } + + #[tokio::test(start_paused = true)] + async fn accept_request_skips_uni_stream_before_setup() { + // A Group racing ahead of the SETUP is reset and skipped; the gate keeps + // reading until it finds the SETUP. + let session = FakeSession::new(ALPN_LITE_05_WIP, [lite05_group(), lite05_setup(Some("/team/room"))]); + let request = Server::new() + .with_versions(Version::Lite(lite::Version::Lite05Wip).into()) + .accept_request(session) + .await + .unwrap(); + assert_eq!(request.path(), Some("/team/room")); + } } diff --git a/rs/moq-relay/src/internal.rs b/rs/moq-relay/src/internal.rs index e4b5e3446..cd8feaedd 100644 --- a/rs/moq-relay/src/internal.rs +++ b/rs/moq-relay/src/internal.rs @@ -209,23 +209,24 @@ fn spawn_session(session: S, cluster: Cluster) where S: web_transport_trait::Session, { - // Full access to everything under the empty root, on the internal tier. - let token = AuthToken::unrestricted(moq_net::Path::new("").to_owned()); - let publish = cluster.publisher(&token); - let subscribe = cluster.subscriber(&token); let stats = cluster.stats.tier(moq_net::Tier::Internal); let serve = async move { + // Read the client's SETUP first: a moq-lite-05 worker can request a path + // (these transports carry no request URI), which scopes its full internal + // access to that subtree. No path means the empty root, as before. + let request = moq_net::Server::new().with_stats(stats).accept_request(session).await?; + + let root = moq_net::Path::new(request.path().unwrap_or("")).to_owned(); + let token = AuthToken::unrestricted(root); + let publish = cluster.publisher(&token); + let subscribe = cluster.subscriber(&token); + // subscribe/publish look backwards on purpose: see connection.rs. We publish // the tracks the client may subscribe to, and subscribe to what it may publish. - let session = moq_net::Server::new() - .with_publish(subscribe) - .with_consume(publish) - .with_stats(stats) - .accept(session) - .await?; - - tracing::info!(version = %session.version(), "negotiated"); + let session = request.with_publish(subscribe).with_consume(publish).ok().await?; + + tracing::info!(version = %session.version(), root = %token.root, "negotiated"); session.closed().await?; anyhow::Ok(()) };