Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 151 additions & 13 deletions rs/moq-native/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,30 @@ impl Client {
Ok(session)
}

/// Build the per-connection moq client, advertising the request path in the SETUP
/// for transports that carry no request URI of their own (raw QUIC, qmux over
/// TCP/UDS, raw iroh). WebTransport and WebSocket already convey the path in their
/// own request, so we omit it there to avoid duplicating it.
#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket",
feature = "tcp",
feature = "uds"
))]
fn connect_client(&self, url: &Url) -> moq_net::Client {
if transport_carries_path(url) {
return self.moq.clone();
}

match request_path(url) {
Some(path) => self.moq.clone().with_path(path),
None => self.moq.clone(),
}
}

#[cfg(any(
feature = "noq",
feature = "quinn",
Expand All @@ -276,27 +300,31 @@ impl Client {
feature = "uds"
))]
async fn connect_inner(&self, url: Url) -> crate::Result<moq_net::Session> {
// Advertise the request path in the moq SETUP for transports that carry no
// request URI of their own; WebTransport and WebSocket already convey it.
let moq = self.connect_client(&url);

// Plain TCP (qmux, no TLS). Explicit opt-in scheme; never raced against
// QUIC, which can't speak it. Use only on a trusted network.
#[cfg(feature = "tcp")]
if url.scheme() == "tcp" {
let session = crate::tcp::connect(url, &self.versions.alpns()).await?;
return Ok(self.moq.connect(session).await?);
return Ok(moq.connect(session).await?);
}

// Unix domain socket (qmux, no TLS). Same-host only; the server can
// authenticate us by uid/gid via SO_PEERCRED.
#[cfg(all(feature = "uds", unix))]
if url.scheme() == "unix" {
let session = crate::unix::connect(url, &self.versions.alpns()).await?;
return Ok(self.moq.connect(session).await?);
return Ok(moq.connect(session).await?);
}

#[cfg(feature = "iroh")]
if url.scheme() == "iroh" {
let endpoint = self.iroh.as_ref().ok_or(Error::IrohDisabled)?;
let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
let session = self.moq.connect(session).await?;
let session = moq.connect(session).await?;
return Ok(session);
}

Expand All @@ -308,13 +336,13 @@ impl Client {

#[cfg(feature = "websocket")]
{
return self.race_moq_connect(url, quic_handle).await;
return self.race_moq_connect(&moq, url, quic_handle).await;
}

#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
return Ok(moq.connect(session).await?);
}
}

Expand All @@ -326,13 +354,13 @@ impl Client {

#[cfg(feature = "websocket")]
{
return self.race_moq_connect(url, quic_handle).await;
return self.race_moq_connect(&moq, url, quic_handle).await;
}

#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
return Ok(moq.connect(session).await?);
}
}

Expand All @@ -343,29 +371,29 @@ impl Client {

#[cfg(feature = "websocket")]
{
return self.race_moq_connect(url, quic_handle).await;
return self.race_moq_connect(&moq, url, quic_handle).await;
}

#[cfg(not(feature = "websocket"))]
{
let session = quic_handle.await?;
return Ok(self.moq.connect(session).await?);
return Ok(moq.connect(session).await?);
}
}

#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?;
return Ok(self.moq.connect(session).await?);
return Ok(moq.connect(session).await?);
}

#[cfg(not(feature = "websocket"))]
return Err(Error::NoBackend("no QUIC backend matched; this should not happen"));
}

#[cfg(feature = "websocket")]
async fn race_moq_connect<Q, S>(&self, url: Url, quic: Q) -> crate::Result<moq_net::Session>
async fn race_moq_connect<Q, S>(&self, moq: &moq_net::Client, url: Url, quic: Q) -> crate::Result<moq_net::Session>
where
Q: Future<Output = crate::Result<S>>,
S: web_transport_trait::Session,
Expand All @@ -380,12 +408,57 @@ impl Client {
};

match race_transport_connect(quic, websocket).await? {
TransportRace::Quic(quic) => Ok(self.moq.connect(quic).await?),
TransportRace::WebSocket(websocket) => Ok(self.moq.connect(websocket).await?),
TransportRace::Quic(quic) => Ok(moq.connect(quic).await?),
TransportRace::WebSocket(websocket) => Ok(moq.connect(websocket).await?),
}
}
}

/// Whether the transport for this URL always conveys the request path itself.
///
/// WebTransport (`https`/`http`) and WebSocket (`ws`/`wss`) carry the path in their
/// request, so it must not be duplicated in the moq SETUP. Everything else advertises
/// it in the SETUP: `moqt`/`moql` raw QUIC and qmux over `tcp`/`unix` have no request
/// URI, and `iroh` only carries one in its HTTP/3 mode (a raw iroh session does not),
/// so iroh always sends it in band to be safe.
#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket",
feature = "tcp",
feature = "uds"
))]
fn transport_carries_path(url: &Url) -> bool {
matches!(url.scheme(), "https" | "http" | "ws" | "wss")
}

/// The request path to advertise in the moq SETUP, derived from the dial URL.
///
/// A `?path=` query overrides everything; it is the only way to set a path on a
/// `unix://` URL, whose URL path is the socket file rather than a namespace. Other
/// schemes (`tcp`, raw QUIC, `iroh`) use the URL path component. Returns `None` for a
/// `unix://` URL with no `?path=`, leaving the namespace at the root.
#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket",
feature = "tcp",
feature = "uds"
))]
fn request_path(url: &Url) -> Option<String> {
if let Some((_, path)) = url.query_pairs().find(|(key, _)| key == "path") {
return Some(path.into_owned());
}
match url.scheme() {
"unix" => None,
_ => Some(url.path().to_string()),
}
}

#[cfg(feature = "websocket")]
#[derive(Debug, PartialEq, Eq)]
enum TransportRace<Q, W> {
Expand Down Expand Up @@ -457,6 +530,71 @@ mod tests {
use super::*;
use clap::Parser;

#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket",
feature = "tcp",
feature = "uds"
))]
#[test]
fn classifies_transport_path() {
for url in ["https://h/p", "http://h/p", "wss://h/p", "ws://h/p"] {
assert!(
transport_carries_path(&Url::parse(url).unwrap()),
"{url} carries its own path"
);
}
// iroh is URL-less here: its raw mode carries no request URI, so it always
// advertises the path in the SETUP.
for url in [
"tcp://h:1/p",
"unix:///run/s.sock",
"moqt://h/p",
"moql://h/p",
"iroh://node/p",
] {
assert!(
!transport_carries_path(&Url::parse(url).unwrap()),
"{url} advertises in SETUP"
);
}
}

#[cfg(any(
feature = "noq",
feature = "quinn",
feature = "quiche",
feature = "iroh",
feature = "websocket",
feature = "tcp",
feature = "uds"
))]
#[test]
fn request_path_from_url() {
// Schemes with a free path component use it directly.
for url in ["tcp://h:1/anycast", "moqt://h/anycast", "iroh://node/anycast"] {
assert_eq!(
request_path(&Url::parse(url).unwrap()).as_deref(),
Some("/anycast"),
"{url}"
);
}
// A unix:// URL path is the socket file, so it has no namespace by default.
assert_eq!(request_path(&Url::parse("unix:///run/s.sock").unwrap()), None);
// ...but a ?path= query supplies one, leaving the socket path intact.
let uds = Url::parse("unix:///run/s.sock?path=/anycast").unwrap();
assert_eq!(uds.path(), "/run/s.sock");
assert_eq!(request_path(&uds).as_deref(), Some("/anycast"));
// ?path= overrides the URL path on any scheme.
assert_eq!(
request_path(&Url::parse("tcp://h:1/ignored?path=/win").unwrap()).as_deref(),
Some("/win")
);
}

#[test]
fn test_toml_disable_verify_survives_update_from() {
let toml = r#"
Expand Down
6 changes: 3 additions & 3 deletions rs/moq-native/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl Server {
Ok(session) => {
return Some(Request {
server,
kind: RequestKind::WebSocket(session),
kind: RequestKind::WebSocket(Box::new(session)),
});
}
Err(err) => tracing::debug!(%err, "failed to accept WebSocket session"),
Expand Down Expand Up @@ -479,7 +479,7 @@ pub(crate) enum RequestKind {
#[cfg(feature = "iroh")]
Iroh(crate::iroh::Request),
#[cfg(feature = "websocket")]
WebSocket(qmux::Session),
WebSocket(Box<qmux::Session>),
}

/// An incoming MoQ session that can be accepted or rejected.
Expand Down Expand Up @@ -573,7 +573,7 @@ impl Request {
.accept(request.ok().await.map_err(crate::iroh::Error::Server)?)
.await?),
#[cfg(feature = "websocket")]
RequestKind::WebSocket(session) => Ok(self.server.accept(session).await?),
RequestKind::WebSocket(session) => Ok(self.server.accept(*session).await?),
}
}

Expand Down
28 changes: 28 additions & 0 deletions rs/moq-net/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct Client {
consume: Option<OriginProducer>,
stats: StatsHandle,
versions: Versions,
path: Option<String>,
}

impl Client {
Expand Down Expand Up @@ -50,6 +51,25 @@ impl Client {
self
}

/// Set the request path to advertise in the SETUP (moq-lite-05).
///
/// Required on transports that carry no request URI (native QUIC, qmux over
/// TCP/TLS/UDS) so the server learns which path the client wants. Omit it on
/// bindings that already carry a URI (WebTransport). Ignored by versions with no
/// Setup stream (moq-lite-01 through 04). The value is normalized to an absolute
/// path (empty becomes `/`, a leading `/` is prepended).
pub fn with_path(mut self, path: impl Into<String>) -> Self {
let path = path.into();
self.path = Some(if path.is_empty() {
"/".to_string()
} else if path.starts_with('/') {
path
} else {
format!("/{path}")
});
self
}

/// Perform the MoQ handshake as a client negotiating the version.
pub async fn connect<S: web_transport_trait::Session>(&self, session: S) -> Result<Session, Error> {
if self.publish.is_none() && self.consume.is_none() {
Expand Down Expand Up @@ -127,13 +147,17 @@ impl Client {
.select(Version::Lite(lite::Version::Lite05Wip))
.ok_or(Error::Version)?;

let setup = lite::Setup {
path: self.path.clone(),
};
let recv_bw = lite::start(
session.clone(),
None,
self.publish.clone(),
self.consume.clone(),
self.stats.clone(),
lite::Version::Lite05Wip,
setup,
)?;

return Ok(Session::new(session, lite::Version::Lite05Wip.into(), recv_bw));
Expand All @@ -150,6 +174,7 @@ impl Client {
self.consume.clone(),
self.stats.clone(),
lite::Version::Lite04,
lite::Setup::default(),
)?;

return Ok(Session::new(session, lite::Version::Lite04.into(), recv_bw));
Expand All @@ -167,6 +192,7 @@ impl Client {
self.consume.clone(),
self.stats.clone(),
lite::Version::Lite03,
lite::Setup::default(),
)?;

return Ok(Session::new(session, lite::Version::Lite03.into(), recv_bw));
Expand Down Expand Up @@ -206,13 +232,15 @@ impl Client {
let recv_bw = match version {
Version::Lite(v) => {
let stream = stream.with_version(v);
// This path only negotiates lite-01/02, which have no Setup stream.
lite::start(
session.clone(),
Some(stream),
self.publish.clone(),
self.consume.clone(),
self.stats.clone(),
v,
lite::Setup::default(),
)?
}
Version::Ietf(v) => {
Expand Down
3 changes: 3 additions & 0 deletions rs/moq-net/src/lite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod priority;
mod probe;
mod publisher;
mod session;
mod setup;
mod stream;
mod subscribe;
mod subscriber;
Expand All @@ -32,6 +33,8 @@ pub use parameters::*;
pub use probe::*;
use publisher::*;
pub(super) use session::*;
pub use setup::Setup;
pub(super) use setup::{accept_setup, send_setup};
pub use stream::*;
pub use subscribe::*;
use subscriber::*;
Expand Down
Loading
Loading