Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions inferno_aoip/src/device_server/flows_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ struct FlowsTransmitterInternal<P: ProxyToSamplesBuffer> {
timestamp_shift: ClockDiff,
tx_source_bit_depth: u8,
current_timestamp: Arc<AtomicUsize>,
/// The actual ring buffer position last read from (start_ts from the most recent TX cycle).
/// Exposed so external buffer writers can align their write positions.
read_position: Arc<AtomicUsize>,
/// Consistent (read_pos, monotonic_time) snapshot for precise timing calibration.
read_position_snapshot: Option<Arc<super::ReadPositionSnapshot>>,
/// Reference instant for monotonic_nanos in the snapshot.
snapshot_ref_instant: std::time::Instant,
on_transfer: Option<TransferNotifier>,
//callback: SamplesRequestCallback,
}
Expand Down Expand Up @@ -152,6 +159,11 @@ impl<P: ProxyToSamplesBuffer> FlowsTransmitterInternal<P> {
pbuff[1..5].copy_from_slice(&(seconds as u32).to_be_bytes());
pbuff[5..9].copy_from_slice(&(subsec_samples as u32).to_be_bytes());
let start_ts = (flow.next_ts as Clock).wrapping_add_signed(self.timestamp_shift);
self.read_position.store(start_ts, Ordering::Release);
if let Some(snapshot) = &self.read_position_snapshot {
let nanos = self.snapshot_ref_instant.elapsed().as_nanos() as u64;
snapshot.publish(start_ts, nanos);
}
for (index_in_flow, &ch_opt) in flow.channel_indices.iter().enumerate() {
if let Some(ch_index) = ch_opt {
//(self.callback)(flow.next_ts, ch_index, &mut tmp_samples[0..flow.fpp]);
Expand Down Expand Up @@ -474,6 +486,8 @@ impl FlowsTransmitter {
channels_outputs: Vec<RBOutput<Sample, P>>,
start_time_rx: Option<tokio::sync::oneshot::Receiver<Clock>>,
current_timestamp: Arc<AtomicUsize>,
read_position: Arc<AtomicUsize>,
read_position_snapshot: Option<Arc<super::ReadPositionSnapshot>>,
on_transfer: Option<TransferNotifier>,
) {
let latency: u32 = (latency_ns as u64 * sample_rate as u64 / 1_000_000_000u64).try_into().unwrap();
Expand All @@ -492,6 +506,15 @@ impl FlowsTransmitter {
.try_into()
.unwrap(),
current_timestamp,
read_position,
read_position_snapshot: read_position_snapshot.clone(),
snapshot_ref_instant: {
let now = std::time::Instant::now();
if let Some(snap) = &read_position_snapshot {
snap.init_ref_instant(now);
}
now
},
on_transfer,
};
internal.run(start_time_rx).await;
Expand All @@ -504,6 +527,8 @@ impl FlowsTransmitter {
channels_outputs: Vec<RBOutput<Sample, P>>,
start_time_rx: Option<tokio::sync::oneshot::Receiver<Clock>>,
current_timestamp: Arc<AtomicUsize>,
read_position: Arc<AtomicUsize>,
read_position_snapshot: Option<Arc<super::ReadPositionSnapshot>>,
on_transfer: Option<TransferNotifier>,
) -> (Self, JoinHandle<()>) {
let (tx, rx) = mpsc::channel(100);
Expand All @@ -522,6 +547,8 @@ impl FlowsTransmitter {
channels_outputs,
start_time_rx,
current_timestamp,
read_position,
read_position_snapshot,
on_transfer,
)
.boxed_local()
Expand Down
116 changes: 112 additions & 4 deletions inferno_aoip/src/device_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::mdns_client::{MdnsClient, PointerToMulticast};
use crate::media_clock::{
async_clock_receiver_to_realtime, make_shared_media_clock, start_clock_receiver, ClockReceiver,
};
use crate::ring_buffer::{self, OwnedBuffer, ProxyToBuffer, ProxyToSamplesBuffer, RBOutput};
use crate::ring_buffer::{self, ProxyToBuffer, ProxyToSamplesBuffer};
use crate::state_storage::StateStorage;
use atomic::Atomic;
use flows_tx::FlowsTransmitter;
Expand All @@ -16,7 +16,7 @@ use std::collections::BTreeMap;
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, OnceLock, RwLock};

use std::time::{Duration, Instant};
use tokio::sync::{broadcast as broadcast_queue, mpsc, watch, Mutex};
Expand All @@ -42,10 +42,75 @@ pub(crate) mod tx_multicasts;

pub use crate::common::{Clock, ClockDiff, Sample};
pub use crate::media_clock::{MediaClock, RealTimeClockReceiver};
pub use crate::ring_buffer::{ExternalBufferParameters, PositionReportDestination};
pub use crate::ring_buffer::{ExternalBufferParameters, OwnedBuffer, PositionReportDestination, RBInput, RBOutput};
pub use crate::ring_buffer::new_owned as new_owned_ring_buffer;
pub use settings::Settings;
pub type AtomicSample = atomic::Atomic<Sample>;

/// Consistent (read_position, monotonic_time) snapshot written by the TX thread
/// at the exact point it updates `read_position`. Readers use a seqlock protocol:
/// odd seq = writer active, even seq = stable. Retry if seq changes between reads.
pub struct ReadPositionSnapshot {
seq: AtomicUsize,
read_position: AtomicUsize,
/// Nanoseconds elapsed since `ref_instant`.
monotonic_nanos: std::sync::atomic::AtomicU64,
/// Reference instant for monotonic_nanos. Set once by the TX thread at startup.
/// Readers reconstruct the snapshot instant as `ref_instant + Duration::from_nanos(monotonic_nanos)`.
ref_instant: OnceLock<std::time::Instant>,
}

impl ReadPositionSnapshot {
pub fn new() -> Self {
Self {
seq: AtomicUsize::new(0),
read_position: AtomicUsize::new(usize::MAX),
monotonic_nanos: std::sync::atomic::AtomicU64::new(0),
ref_instant: OnceLock::new(),
}
}

pub(crate) fn init_ref_instant(&self, instant: std::time::Instant) {
let _ = self.ref_instant.set(instant);
}

pub(crate) fn publish(&self, read_position: usize, monotonic_nanos: u64) {
let seq = self.seq.load(std::sync::atomic::Ordering::Relaxed);
self.seq
.store(seq.wrapping_add(1), std::sync::atomic::Ordering::Release);
self.read_position
.store(read_position, std::sync::atomic::Ordering::Relaxed);
self.monotonic_nanos
.store(monotonic_nanos, std::sync::atomic::Ordering::Relaxed);
self.seq
.store(seq.wrapping_add(2), std::sync::atomic::Ordering::Release);
}

pub fn try_read(&self) -> Option<(usize, std::time::Instant)> {
let ref_instant = *self.ref_instant.get()?;
for _ in 0..8 {
let seq1 = self.seq.load(std::sync::atomic::Ordering::Acquire);
if seq1 & 1 != 0 {
continue;
}
let pos = self
.read_position
.load(std::sync::atomic::Ordering::Relaxed);
let nanos = self
.monotonic_nanos
.load(std::sync::atomic::Ordering::Relaxed);
let seq2 = self.seq.load(std::sync::atomic::Ordering::Acquire);
if seq1 == seq2 {
if pos == usize::MAX {
return None;
}
return Some((pos, ref_instant + Duration::from_nanos(nanos)));
}
}
None
}
}

use channels_subscriber::{ChannelsBuffering, ChannelsSubscriber, ExternalBuffering, OwnedBuffering};
use peaks::peaks_of_buffers;
use samples_collector::{RealTimeSamplesReceiver, SamplesCallback, SamplesCollector};
Expand Down Expand Up @@ -286,13 +351,54 @@ impl DeviceServer {
tx_channels_buffers.iter().map(|par| ring_buffer::wrap_external_source(par, 0)).collect();
let rbs = rb_outputs.iter().map(|rbo| rbo.shared().clone()).collect_vec();
*self.tx_peaks_supplier.write().unwrap() = Box::new(move || peaks_of_buffers(&rbs));
self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, on_transfer).await;
self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, None, None, on_transfer).await;
}

/// Start transmitting from owned ring buffers.
///
/// Creates `channel_count` owned ring buffers and returns the `RBInput` write handles.
/// The caller writes audio samples via `RBInput::write_from_at()`.
/// The `RBOutput` read handles are passed to the internal transmitter.
///
/// Unlike `transmit_from_external_buffer`, owned buffers:
/// - Track `readable_pos` on the write side (inferno only reads validated data)
/// - Have `unconditional_read() == false` (reads check readable_pos)
/// - Support hole detection and fill via `hole_fix_wait`
///
/// The `read_position` atomic is updated by the FlowsTransmitter with the actual
/// ring buffer position it reads from (`start_ts = next_ts + timestamp_shift`).
/// This allows external writers to align their write positions correctly.
pub async fn transmit_from_owned_buffer(
&mut self,
channel_count: usize,
buffer_length: usize,
hole_fix_wait: usize,
start_time_rx: tokio::sync::oneshot::Receiver<Clock>,
current_timestamp: Arc<AtomicUsize>,
read_position: Arc<AtomicUsize>,
read_position_snapshot: Option<Arc<ReadPositionSnapshot>>,
on_transfer: Option<TransferNotifier>,
) -> Vec<ring_buffer::RBInput<Sample, OwnedBuffer<Atomic<Sample>>>> {
let mut rb_inputs = Vec::with_capacity(channel_count);
let mut rb_outputs = Vec::with_capacity(channel_count);
for _ in 0..channel_count {
let (input, output) = ring_buffer::new_owned(buffer_length, 0, hole_fix_wait);
rb_inputs.push(input);
rb_outputs.push(output);
}
let rbs = rb_outputs.iter().map(|rbo| rbo.shared().clone()).collect_vec();
*self.tx_peaks_supplier.write().unwrap() = Box::new(move || peaks_of_buffers(&rbs));
self.transmit(Some(start_time_rx), rb_outputs, current_timestamp, Some(read_position), read_position_snapshot, on_transfer).await;
rb_inputs
}

async fn transmit<P: ProxyToSamplesBuffer + Send + Sync + 'static>(
&mut self,
start_time_rx: Option<tokio::sync::oneshot::Receiver<Clock>>,
rb_outputs: Vec<RBOutput<Sample, P>>,
current_timestamp: Arc<AtomicUsize>,
read_position: Option<Arc<AtomicUsize>>,
read_position_snapshot: Option<Arc<ReadPositionSnapshot>>,
on_transfer: Option<TransferNotifier>,
) {
let clock_rx = self.clock_receiver.subscribe();
Expand All @@ -305,6 +411,8 @@ impl DeviceServer {
rb_outputs.clone(),
start_time_rx,
current_timestamp.clone(),
read_position.unwrap_or_else(|| Arc::new(AtomicUsize::new(usize::MAX))),
read_position_snapshot,
on_transfer,
);
*self.flows_tx.lock().await = Some(flows_tx_handle);
Expand Down