From 31cd4de90e2c89eab7a5efbd8bcc146b7c7ef231 Mon Sep 17 00:00:00 2001 From: Peter Salanki Date: Sun, 5 Apr 2026 10:29:54 -0400 Subject: [PATCH 1/3] Add transmit_from_owned_buffer() for non-ALSA TX clients MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a public method to DeviceServer that creates owned ring buffers and returns RBInput write handles to the caller. Unlike the ExternalBuffer path, owned buffers track readable_pos properly, so: - PositionReportDestination is updated on each write - Buffer occupancy metrics are accurate - unconditional_read() is false — inferno only reads validated data Also re-exports OwnedBuffer, RBInput, RBOutput, new_owned_ring_buffer from the device_server module for external consumers. --- inferno_aoip/src/device_server/flows_tx.rs | 8 ++++ inferno_aoip/src/device_server/mod.rs | 47 ++++++++++++++++++++-- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/inferno_aoip/src/device_server/flows_tx.rs b/inferno_aoip/src/device_server/flows_tx.rs index 58d03b1..c398602 100644 --- a/inferno_aoip/src/device_server/flows_tx.rs +++ b/inferno_aoip/src/device_server/flows_tx.rs @@ -103,6 +103,9 @@ struct FlowsTransmitterInternal { timestamp_shift: ClockDiff, tx_source_bit_depth: u8, current_timestamp: Arc, + /// The actual ring buffer position last read from (start_ts from the most recent TX cycle). + /// Exposed so external buffer writers can align their write positions. + read_position: Arc, on_transfer: Option, //callback: SamplesRequestCallback, } @@ -152,6 +155,7 @@ impl FlowsTransmitterInternal

{ pbuff[1..5].copy_from_slice(&(seconds as u32).to_be_bytes()); pbuff[5..9].copy_from_slice(&(subsec_samples as u32).to_be_bytes()); let start_ts = (flow.next_ts as Clock).wrapping_add_signed(self.timestamp_shift); + self.read_position.store(start_ts, Ordering::Release); for (index_in_flow, &ch_opt) in flow.channel_indices.iter().enumerate() { if let Some(ch_index) = ch_opt { //(self.callback)(flow.next_ts, ch_index, &mut tmp_samples[0..flow.fpp]); @@ -474,6 +478,7 @@ impl FlowsTransmitter { channels_outputs: Vec>, start_time_rx: Option>, current_timestamp: Arc, + read_position: Arc, on_transfer: Option, ) { let latency: u32 = (latency_ns as u64 * sample_rate as u64 / 1_000_000_000u64).try_into().unwrap(); @@ -492,6 +497,7 @@ impl FlowsTransmitter { .try_into() .unwrap(), current_timestamp, + read_position, on_transfer, }; internal.run(start_time_rx).await; @@ -504,6 +510,7 @@ impl FlowsTransmitter { channels_outputs: Vec>, start_time_rx: Option>, current_timestamp: Arc, + read_position: Arc, on_transfer: Option, ) -> (Self, JoinHandle<()>) { let (tx, rx) = mpsc::channel(100); @@ -522,6 +529,7 @@ impl FlowsTransmitter { channels_outputs, start_time_rx, current_timestamp, + read_position, on_transfer, ) .boxed_local() diff --git a/inferno_aoip/src/device_server/mod.rs b/inferno_aoip/src/device_server/mod.rs index 0729145..da6a113 100644 --- a/inferno_aoip/src/device_server/mod.rs +++ b/inferno_aoip/src/device_server/mod.rs @@ -2,7 +2,7 @@ use crate::mdns_client::{MdnsClient, PointerToMulticast}; use crate::media_clock::{ async_clock_receiver_to_realtime, make_shared_media_clock, start_clock_receiver, ClockReceiver, }; -use crate::ring_buffer::{self, OwnedBuffer, ProxyToBuffer, ProxyToSamplesBuffer, RBOutput}; +use crate::ring_buffer::{self, ProxyToBuffer, ProxyToSamplesBuffer}; use crate::state_storage::StateStorage; use atomic::Atomic; use flows_tx::FlowsTransmitter; @@ -42,7 +42,8 @@ pub(crate) mod tx_multicasts; pub use crate::common::{Clock, ClockDiff, Sample}; pub use crate::media_clock::{MediaClock, RealTimeClockReceiver}; -pub use crate::ring_buffer::{ExternalBufferParameters, PositionReportDestination}; +pub use crate::ring_buffer::{ExternalBufferParameters, OwnedBuffer, PositionReportDestination, RBInput, RBOutput}; +pub use crate::ring_buffer::new_owned as new_owned_ring_buffer; pub use settings::Settings; pub type AtomicSample = atomic::Atomic; @@ -286,13 +287,52 @@ impl DeviceServer { tx_channels_buffers.iter().map(|par| ring_buffer::wrap_external_source(par, 0)).collect(); let rbs = rb_outputs.iter().map(|rbo| rbo.shared().clone()).collect_vec(); *self.tx_peaks_supplier.write().unwrap() = Box::new(move || peaks_of_buffers(&rbs)); - self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, on_transfer).await; + self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, None, on_transfer).await; } + + /// Start transmitting from owned ring buffers. + /// + /// Creates `channel_count` owned ring buffers and returns the `RBInput` write handles. + /// The caller writes audio samples via `RBInput::write_from_at()`. + /// The `RBOutput` read handles are passed to the internal transmitter. + /// + /// Unlike `transmit_from_external_buffer`, owned buffers: + /// - Track `readable_pos` on the write side (inferno only reads validated data) + /// - Have `unconditional_read() == false` (reads check readable_pos) + /// - Support hole detection and fill via `hole_fix_wait` + /// + /// The `read_position` atomic is updated by the FlowsTransmitter with the actual + /// ring buffer position it reads from (`start_ts = next_ts + timestamp_shift`). + /// This allows external writers to align their write positions correctly. + pub async fn transmit_from_owned_buffer( + &mut self, + channel_count: usize, + buffer_length: usize, + hole_fix_wait: usize, + start_time_rx: tokio::sync::oneshot::Receiver, + current_timestamp: Arc, + read_position: Arc, + on_transfer: Option, + ) -> Vec>>> { + let mut rb_inputs = Vec::with_capacity(channel_count); + let mut rb_outputs = Vec::with_capacity(channel_count); + for _ in 0..channel_count { + let (input, output) = ring_buffer::new_owned(buffer_length, 0, hole_fix_wait); + rb_inputs.push(input); + rb_outputs.push(output); + } + let rbs = rb_outputs.iter().map(|rbo| rbo.shared().clone()).collect_vec(); + *self.tx_peaks_supplier.write().unwrap() = Box::new(move || peaks_of_buffers(&rbs)); + self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, Some(read_position), on_transfer).await; + rb_inputs + } + async fn transmit( &mut self, start_time_rx: Option>, rb_outputs: Vec>, current_timestamp: Arc, + read_position: Option>, on_transfer: Option, ) { let clock_rx = self.clock_receiver.subscribe(); @@ -305,6 +345,7 @@ impl DeviceServer { rb_outputs.clone(), start_time_rx, current_timestamp.clone(), + read_position.unwrap_or_else(|| Arc::new(AtomicUsize::new(usize::MAX))), on_transfer, ); *self.flows_tx.lock().await = Some(flows_tx_handle); From 67ecfbaca3840fa43463d2e6843f60bdd7735b1b Mon Sep 17 00:00:00 2001 From: Peter Salanki Date: Mon, 6 Apr 2026 01:17:10 -0400 Subject: [PATCH 2/3] Add ReadPositionSnapshot for precise TX timing observation Adds a seqlock-style shared snapshot that pairs (read_position, monotonic_nanos) at the exact point FlowsTransmitter updates read_position. This gives external buffer writers (like spin2dante) a consistent observation of when and where the TX thread is reading, without the imprecision of sampling read_pos and Instant::now() separately. - ReadPositionSnapshot struct with seq/read_position/monotonic_nanos atomics - Threaded through FlowsTransmitter::start(), run(), transmit_from_owned_buffer() - Written at the TX update site with odd/even seqlock protocol - Backward compatible: existing callers pass None for the snapshot --- inferno_aoip/src/device_server/flows_tx.rs | 19 ++++++++++++++- inferno_aoip/src/device_server/mod.rs | 27 ++++++++++++++++++++-- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/inferno_aoip/src/device_server/flows_tx.rs b/inferno_aoip/src/device_server/flows_tx.rs index c398602..124483a 100644 --- a/inferno_aoip/src/device_server/flows_tx.rs +++ b/inferno_aoip/src/device_server/flows_tx.rs @@ -1,6 +1,6 @@ use std::net::{IpAddr, Ipv4Addr, UdpSocket}; use std::num::Wrapping; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize}; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; use std::{collections::BTreeMap, net::SocketAddr, sync::atomic::AtomicU32, time::Duration}; @@ -106,6 +106,10 @@ struct FlowsTransmitterInternal { /// The actual ring buffer position last read from (start_ts from the most recent TX cycle). /// Exposed so external buffer writers can align their write positions. read_position: Arc, + /// Consistent (read_pos, monotonic_time) snapshot for precise timing calibration. + read_position_snapshot: Option>, + /// Reference instant for monotonic_nanos in the snapshot. + snapshot_ref_instant: std::time::Instant, on_transfer: Option, //callback: SamplesRequestCallback, } @@ -156,6 +160,14 @@ impl FlowsTransmitterInternal

{ pbuff[5..9].copy_from_slice(&(subsec_samples as u32).to_be_bytes()); let start_ts = (flow.next_ts as Clock).wrapping_add_signed(self.timestamp_shift); self.read_position.store(start_ts, Ordering::Release); + if let Some(snapshot) = &self.read_position_snapshot { + let seq = snapshot.seq.load(Ordering::Relaxed); + snapshot.seq.store(seq.wrapping_add(1), Ordering::Release); // odd = writing + snapshot.read_position.store(start_ts, Ordering::Relaxed); + let nanos = self.snapshot_ref_instant.elapsed().as_nanos() as u64; + snapshot.monotonic_nanos.store(nanos, Ordering::Relaxed); + snapshot.seq.store(seq.wrapping_add(2), Ordering::Release); // even = stable + } for (index_in_flow, &ch_opt) in flow.channel_indices.iter().enumerate() { if let Some(ch_index) = ch_opt { //(self.callback)(flow.next_ts, ch_index, &mut tmp_samples[0..flow.fpp]); @@ -479,6 +491,7 @@ impl FlowsTransmitter { start_time_rx: Option>, current_timestamp: Arc, read_position: Arc, + read_position_snapshot: Option>, on_transfer: Option, ) { let latency: u32 = (latency_ns as u64 * sample_rate as u64 / 1_000_000_000u64).try_into().unwrap(); @@ -498,6 +511,8 @@ impl FlowsTransmitter { .unwrap(), current_timestamp, read_position, + read_position_snapshot, + snapshot_ref_instant: std::time::Instant::now(), on_transfer, }; internal.run(start_time_rx).await; @@ -511,6 +526,7 @@ impl FlowsTransmitter { start_time_rx: Option>, current_timestamp: Arc, read_position: Arc, + read_position_snapshot: Option>, on_transfer: Option, ) -> (Self, JoinHandle<()>) { let (tx, rx) = mpsc::channel(100); @@ -530,6 +546,7 @@ impl FlowsTransmitter { start_time_rx, current_timestamp, read_position, + read_position_snapshot, on_transfer, ) .boxed_local() diff --git a/inferno_aoip/src/device_server/mod.rs b/inferno_aoip/src/device_server/mod.rs index da6a113..4b3384c 100644 --- a/inferno_aoip/src/device_server/mod.rs +++ b/inferno_aoip/src/device_server/mod.rs @@ -47,6 +47,26 @@ pub use crate::ring_buffer::new_owned as new_owned_ring_buffer; pub use settings::Settings; pub type AtomicSample = atomic::Atomic; +/// Consistent (read_position, monotonic_time) snapshot written by the TX thread +/// at the exact point it updates `read_position`. Readers use a seqlock protocol: +/// odd seq = writer active, even seq = stable. Retry if seq changes between reads. +pub struct ReadPositionSnapshot { + pub seq: AtomicUsize, + pub read_position: AtomicUsize, + /// Nanoseconds elapsed since a reference Instant stored in the TX thread. + pub monotonic_nanos: std::sync::atomic::AtomicU64, +} + +impl ReadPositionSnapshot { + pub fn new() -> Self { + Self { + seq: AtomicUsize::new(0), + read_position: AtomicUsize::new(usize::MAX), + monotonic_nanos: std::sync::atomic::AtomicU64::new(0), + } + } +} + use channels_subscriber::{ChannelsBuffering, ChannelsSubscriber, ExternalBuffering, OwnedBuffering}; use peaks::peaks_of_buffers; use samples_collector::{RealTimeSamplesReceiver, SamplesCallback, SamplesCollector}; @@ -287,7 +307,7 @@ impl DeviceServer { tx_channels_buffers.iter().map(|par| ring_buffer::wrap_external_source(par, 0)).collect(); let rbs = rb_outputs.iter().map(|rbo| rbo.shared().clone()).collect_vec(); *self.tx_peaks_supplier.write().unwrap() = Box::new(move || peaks_of_buffers(&rbs)); - self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, None, on_transfer).await; + self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, None, None, on_transfer).await; } /// Start transmitting from owned ring buffers. @@ -312,6 +332,7 @@ impl DeviceServer { start_time_rx: tokio::sync::oneshot::Receiver, current_timestamp: Arc, read_position: Arc, + read_position_snapshot: Option>, on_transfer: Option, ) -> Vec>>> { let mut rb_inputs = Vec::with_capacity(channel_count); @@ -323,7 +344,7 @@ impl DeviceServer { } let rbs = rb_outputs.iter().map(|rbo| rbo.shared().clone()).collect_vec(); *self.tx_peaks_supplier.write().unwrap() = Box::new(move || peaks_of_buffers(&rbs)); - self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, Some(read_position), on_transfer).await; + self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, Some(read_position), read_position_snapshot, on_transfer).await; rb_inputs } @@ -333,6 +354,7 @@ impl DeviceServer { rb_outputs: Vec>, current_timestamp: Arc, read_position: Option>, + read_position_snapshot: Option>, on_transfer: Option, ) { let clock_rx = self.clock_receiver.subscribe(); @@ -346,6 +368,7 @@ impl DeviceServer { start_time_rx, current_timestamp.clone(), read_position.unwrap_or_else(|| Arc::new(AtomicUsize::new(usize::MAX))), + read_position_snapshot, on_transfer, ); *self.flows_tx.lock().await = Some(flows_tx_handle); From 5ad4ffdaf113926396a6d9ef1710a0dcee7f44b2 Mon Sep 17 00:00:00 2001 From: Peter Salanki Date: Mon, 6 Apr 2026 09:53:31 -0400 Subject: [PATCH 3/3] Expose ref_instant in ReadPositionSnapshot for correct time-base contract The TX thread's monotonic_nanos are relative to a reference Instant that only it knows. Previously the bridge had to guess. Now the snapshot exposes ref_instant via a Mutex>, set once at TX start. Readers reconstruct the snapshot instant as ref_instant + monotonic_nanos. --- inferno_aoip/src/device_server/flows_tx.rs | 18 ++++---- inferno_aoip/src/device_server/mod.rs | 54 ++++++++++++++++++++-- 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/inferno_aoip/src/device_server/flows_tx.rs b/inferno_aoip/src/device_server/flows_tx.rs index 124483a..c56bbc9 100644 --- a/inferno_aoip/src/device_server/flows_tx.rs +++ b/inferno_aoip/src/device_server/flows_tx.rs @@ -1,6 +1,6 @@ use std::net::{IpAddr, Ipv4Addr, UdpSocket}; use std::num::Wrapping; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; use std::{collections::BTreeMap, net::SocketAddr, sync::atomic::AtomicU32, time::Duration}; @@ -161,12 +161,8 @@ impl FlowsTransmitterInternal

{ let start_ts = (flow.next_ts as Clock).wrapping_add_signed(self.timestamp_shift); self.read_position.store(start_ts, Ordering::Release); if let Some(snapshot) = &self.read_position_snapshot { - let seq = snapshot.seq.load(Ordering::Relaxed); - snapshot.seq.store(seq.wrapping_add(1), Ordering::Release); // odd = writing - snapshot.read_position.store(start_ts, Ordering::Relaxed); let nanos = self.snapshot_ref_instant.elapsed().as_nanos() as u64; - snapshot.monotonic_nanos.store(nanos, Ordering::Relaxed); - snapshot.seq.store(seq.wrapping_add(2), Ordering::Release); // even = stable + snapshot.publish(start_ts, nanos); } for (index_in_flow, &ch_opt) in flow.channel_indices.iter().enumerate() { if let Some(ch_index) = ch_opt { @@ -511,8 +507,14 @@ impl FlowsTransmitter { .unwrap(), current_timestamp, read_position, - read_position_snapshot, - snapshot_ref_instant: std::time::Instant::now(), + read_position_snapshot: read_position_snapshot.clone(), + snapshot_ref_instant: { + let now = std::time::Instant::now(); + if let Some(snap) = &read_position_snapshot { + snap.init_ref_instant(now); + } + now + }, on_transfer, }; internal.run(start_time_rx).await; diff --git a/inferno_aoip/src/device_server/mod.rs b/inferno_aoip/src/device_server/mod.rs index 4b3384c..2703c96 100644 --- a/inferno_aoip/src/device_server/mod.rs +++ b/inferno_aoip/src/device_server/mod.rs @@ -16,7 +16,7 @@ use std::collections::BTreeMap; use std::io::Write; use std::pin::Pin; use std::sync::atomic::AtomicUsize; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, OnceLock, RwLock}; use std::time::{Duration, Instant}; use tokio::sync::{broadcast as broadcast_queue, mpsc, watch, Mutex}; @@ -51,10 +51,13 @@ pub type AtomicSample = atomic::Atomic; /// at the exact point it updates `read_position`. Readers use a seqlock protocol: /// odd seq = writer active, even seq = stable. Retry if seq changes between reads. pub struct ReadPositionSnapshot { - pub seq: AtomicUsize, - pub read_position: AtomicUsize, - /// Nanoseconds elapsed since a reference Instant stored in the TX thread. - pub monotonic_nanos: std::sync::atomic::AtomicU64, + seq: AtomicUsize, + read_position: AtomicUsize, + /// Nanoseconds elapsed since `ref_instant`. + monotonic_nanos: std::sync::atomic::AtomicU64, + /// Reference instant for monotonic_nanos. Set once by the TX thread at startup. + /// Readers reconstruct the snapshot instant as `ref_instant + Duration::from_nanos(monotonic_nanos)`. + ref_instant: OnceLock, } impl ReadPositionSnapshot { @@ -63,8 +66,49 @@ impl ReadPositionSnapshot { seq: AtomicUsize::new(0), read_position: AtomicUsize::new(usize::MAX), monotonic_nanos: std::sync::atomic::AtomicU64::new(0), + ref_instant: OnceLock::new(), } } + + pub(crate) fn init_ref_instant(&self, instant: std::time::Instant) { + let _ = self.ref_instant.set(instant); + } + + pub(crate) fn publish(&self, read_position: usize, monotonic_nanos: u64) { + let seq = self.seq.load(std::sync::atomic::Ordering::Relaxed); + self.seq + .store(seq.wrapping_add(1), std::sync::atomic::Ordering::Release); + self.read_position + .store(read_position, std::sync::atomic::Ordering::Relaxed); + self.monotonic_nanos + .store(monotonic_nanos, std::sync::atomic::Ordering::Relaxed); + self.seq + .store(seq.wrapping_add(2), std::sync::atomic::Ordering::Release); + } + + pub fn try_read(&self) -> Option<(usize, std::time::Instant)> { + let ref_instant = *self.ref_instant.get()?; + for _ in 0..8 { + let seq1 = self.seq.load(std::sync::atomic::Ordering::Acquire); + if seq1 & 1 != 0 { + continue; + } + let pos = self + .read_position + .load(std::sync::atomic::Ordering::Relaxed); + let nanos = self + .monotonic_nanos + .load(std::sync::atomic::Ordering::Relaxed); + let seq2 = self.seq.load(std::sync::atomic::Ordering::Acquire); + if seq1 == seq2 { + if pos == usize::MAX { + return None; + } + return Some((pos, ref_instant + Duration::from_nanos(nanos))); + } + } + None + } } use channels_subscriber::{ChannelsBuffering, ChannelsSubscriber, ExternalBuffering, OwnedBuffering};