diff --git a/inferno_aoip/src/device_server/flows_tx.rs b/inferno_aoip/src/device_server/flows_tx.rs index 58d03b1..c56bbc9 100644 --- a/inferno_aoip/src/device_server/flows_tx.rs +++ b/inferno_aoip/src/device_server/flows_tx.rs @@ -103,6 +103,13 @@ struct FlowsTransmitterInternal { timestamp_shift: ClockDiff, tx_source_bit_depth: u8, current_timestamp: Arc, + /// The actual ring buffer position last read from (start_ts from the most recent TX cycle). + /// Exposed so external buffer writers can align their write positions. + read_position: Arc, + /// Consistent (read_pos, monotonic_time) snapshot for precise timing calibration. + read_position_snapshot: Option>, + /// Reference instant for monotonic_nanos in the snapshot. + snapshot_ref_instant: std::time::Instant, on_transfer: Option, //callback: SamplesRequestCallback, } @@ -152,6 +159,11 @@ impl FlowsTransmitterInternal

{ pbuff[1..5].copy_from_slice(&(seconds as u32).to_be_bytes()); pbuff[5..9].copy_from_slice(&(subsec_samples as u32).to_be_bytes()); let start_ts = (flow.next_ts as Clock).wrapping_add_signed(self.timestamp_shift); + self.read_position.store(start_ts, Ordering::Release); + if let Some(snapshot) = &self.read_position_snapshot { + let nanos = self.snapshot_ref_instant.elapsed().as_nanos() as u64; + snapshot.publish(start_ts, nanos); + } for (index_in_flow, &ch_opt) in flow.channel_indices.iter().enumerate() { if let Some(ch_index) = ch_opt { //(self.callback)(flow.next_ts, ch_index, &mut tmp_samples[0..flow.fpp]); @@ -474,6 +486,8 @@ impl FlowsTransmitter { channels_outputs: Vec>, start_time_rx: Option>, current_timestamp: Arc, + read_position: Arc, + read_position_snapshot: Option>, on_transfer: Option, ) { let latency: u32 = (latency_ns as u64 * sample_rate as u64 / 1_000_000_000u64).try_into().unwrap(); @@ -492,6 +506,15 @@ impl FlowsTransmitter { .try_into() .unwrap(), current_timestamp, + read_position, + read_position_snapshot: read_position_snapshot.clone(), + snapshot_ref_instant: { + let now = std::time::Instant::now(); + if let Some(snap) = &read_position_snapshot { + snap.init_ref_instant(now); + } + now + }, on_transfer, }; internal.run(start_time_rx).await; @@ -504,6 +527,8 @@ impl FlowsTransmitter { channels_outputs: Vec>, start_time_rx: Option>, current_timestamp: Arc, + read_position: Arc, + read_position_snapshot: Option>, on_transfer: Option, ) -> (Self, JoinHandle<()>) { let (tx, rx) = mpsc::channel(100); @@ -522,6 +547,8 @@ impl FlowsTransmitter { channels_outputs, start_time_rx, current_timestamp, + read_position, + read_position_snapshot, on_transfer, ) .boxed_local() diff --git a/inferno_aoip/src/device_server/mod.rs b/inferno_aoip/src/device_server/mod.rs index 0729145..2703c96 100644 --- a/inferno_aoip/src/device_server/mod.rs +++ b/inferno_aoip/src/device_server/mod.rs @@ -2,7 +2,7 @@ use crate::mdns_client::{MdnsClient, PointerToMulticast}; use crate::media_clock::{ async_clock_receiver_to_realtime, make_shared_media_clock, start_clock_receiver, ClockReceiver, }; -use crate::ring_buffer::{self, OwnedBuffer, ProxyToBuffer, ProxyToSamplesBuffer, RBOutput}; +use crate::ring_buffer::{self, ProxyToBuffer, ProxyToSamplesBuffer}; use crate::state_storage::StateStorage; use atomic::Atomic; use flows_tx::FlowsTransmitter; @@ -16,7 +16,7 @@ use std::collections::BTreeMap; use std::io::Write; use std::pin::Pin; use std::sync::atomic::AtomicUsize; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, OnceLock, RwLock}; use std::time::{Duration, Instant}; use tokio::sync::{broadcast as broadcast_queue, mpsc, watch, Mutex}; @@ -42,10 +42,75 @@ pub(crate) mod tx_multicasts; pub use crate::common::{Clock, ClockDiff, Sample}; pub use crate::media_clock::{MediaClock, RealTimeClockReceiver}; -pub use crate::ring_buffer::{ExternalBufferParameters, PositionReportDestination}; +pub use crate::ring_buffer::{ExternalBufferParameters, OwnedBuffer, PositionReportDestination, RBInput, RBOutput}; +pub use crate::ring_buffer::new_owned as new_owned_ring_buffer; pub use settings::Settings; pub type AtomicSample = atomic::Atomic; +/// Consistent (read_position, monotonic_time) snapshot written by the TX thread +/// at the exact point it updates `read_position`. Readers use a seqlock protocol: +/// odd seq = writer active, even seq = stable. Retry if seq changes between reads. +pub struct ReadPositionSnapshot { + seq: AtomicUsize, + read_position: AtomicUsize, + /// Nanoseconds elapsed since `ref_instant`. + monotonic_nanos: std::sync::atomic::AtomicU64, + /// Reference instant for monotonic_nanos. Set once by the TX thread at startup. + /// Readers reconstruct the snapshot instant as `ref_instant + Duration::from_nanos(monotonic_nanos)`. + ref_instant: OnceLock, +} + +impl ReadPositionSnapshot { + pub fn new() -> Self { + Self { + seq: AtomicUsize::new(0), + read_position: AtomicUsize::new(usize::MAX), + monotonic_nanos: std::sync::atomic::AtomicU64::new(0), + ref_instant: OnceLock::new(), + } + } + + pub(crate) fn init_ref_instant(&self, instant: std::time::Instant) { + let _ = self.ref_instant.set(instant); + } + + pub(crate) fn publish(&self, read_position: usize, monotonic_nanos: u64) { + let seq = self.seq.load(std::sync::atomic::Ordering::Relaxed); + self.seq + .store(seq.wrapping_add(1), std::sync::atomic::Ordering::Release); + self.read_position + .store(read_position, std::sync::atomic::Ordering::Relaxed); + self.monotonic_nanos + .store(monotonic_nanos, std::sync::atomic::Ordering::Relaxed); + self.seq + .store(seq.wrapping_add(2), std::sync::atomic::Ordering::Release); + } + + pub fn try_read(&self) -> Option<(usize, std::time::Instant)> { + let ref_instant = *self.ref_instant.get()?; + for _ in 0..8 { + let seq1 = self.seq.load(std::sync::atomic::Ordering::Acquire); + if seq1 & 1 != 0 { + continue; + } + let pos = self + .read_position + .load(std::sync::atomic::Ordering::Relaxed); + let nanos = self + .monotonic_nanos + .load(std::sync::atomic::Ordering::Relaxed); + let seq2 = self.seq.load(std::sync::atomic::Ordering::Acquire); + if seq1 == seq2 { + if pos == usize::MAX { + return None; + } + return Some((pos, ref_instant + Duration::from_nanos(nanos))); + } + } + None + } +} + use channels_subscriber::{ChannelsBuffering, ChannelsSubscriber, ExternalBuffering, OwnedBuffering}; use peaks::peaks_of_buffers; use samples_collector::{RealTimeSamplesReceiver, SamplesCallback, SamplesCollector}; @@ -286,13 +351,54 @@ impl DeviceServer { tx_channels_buffers.iter().map(|par| ring_buffer::wrap_external_source(par, 0)).collect(); let rbs = rb_outputs.iter().map(|rbo| rbo.shared().clone()).collect_vec(); *self.tx_peaks_supplier.write().unwrap() = Box::new(move || peaks_of_buffers(&rbs)); - self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, on_transfer).await; + self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, None, None, on_transfer).await; } + + /// Start transmitting from owned ring buffers. + /// + /// Creates `channel_count` owned ring buffers and returns the `RBInput` write handles. + /// The caller writes audio samples via `RBInput::write_from_at()`. + /// The `RBOutput` read handles are passed to the internal transmitter. + /// + /// Unlike `transmit_from_external_buffer`, owned buffers: + /// - Track `readable_pos` on the write side (inferno only reads validated data) + /// - Have `unconditional_read() == false` (reads check readable_pos) + /// - Support hole detection and fill via `hole_fix_wait` + /// + /// The `read_position` atomic is updated by the FlowsTransmitter with the actual + /// ring buffer position it reads from (`start_ts = next_ts + timestamp_shift`). + /// This allows external writers to align their write positions correctly. + pub async fn transmit_from_owned_buffer( + &mut self, + channel_count: usize, + buffer_length: usize, + hole_fix_wait: usize, + start_time_rx: tokio::sync::oneshot::Receiver, + current_timestamp: Arc, + read_position: Arc, + read_position_snapshot: Option>, + on_transfer: Option, + ) -> Vec>>> { + let mut rb_inputs = Vec::with_capacity(channel_count); + let mut rb_outputs = Vec::with_capacity(channel_count); + for _ in 0..channel_count { + let (input, output) = ring_buffer::new_owned(buffer_length, 0, hole_fix_wait); + rb_inputs.push(input); + rb_outputs.push(output); + } + let rbs = rb_outputs.iter().map(|rbo| rbo.shared().clone()).collect_vec(); + *self.tx_peaks_supplier.write().unwrap() = Box::new(move || peaks_of_buffers(&rbs)); + self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, Some(read_position), read_position_snapshot, on_transfer).await; + rb_inputs + } + async fn transmit( &mut self, start_time_rx: Option>, rb_outputs: Vec>, current_timestamp: Arc, + read_position: Option>, + read_position_snapshot: Option>, on_transfer: Option, ) { let clock_rx = self.clock_receiver.subscribe(); @@ -305,6 +411,8 @@ impl DeviceServer { rb_outputs.clone(), start_time_rx, current_timestamp.clone(), + read_position.unwrap_or_else(|| Arc::new(AtomicUsize::new(usize::MAX))), + read_position_snapshot, on_transfer, ); *self.flows_tx.lock().await = Some(flows_tx_handle);