@@ -23,9 +23,11 @@ use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
2323use crate :: virtual_file:: owned_buffers_io:: io_buf_aligned:: IoBufAlignedMut ;
2424use crate :: virtual_file:: owned_buffers_io:: slice:: SliceMutExt ;
2525use crate :: virtual_file:: owned_buffers_io:: write:: { Buffer , FlushTaskError } ;
26- use crate :: virtual_file:: { self , IoBufferMut , TempVirtualFile , VirtualFile , owned_buffers_io} ;
26+ use crate :: virtual_file:: { self , IoBuffer , IoBufferMut , TempVirtualFile , VirtualFile , owned_buffers_io} ;
2727
2828use self :: owned_buffers_io:: write:: OwnedAsyncWriter ;
29+ use self :: owned_buffers_io:: io_buf_ext:: FullSlice ;
30+ use arc_swap:: ArcSwap ;
2931
3032pub struct EphemeralFile {
3133 _tenant_shard_id : TenantShardId ,
@@ -38,6 +40,10 @@ pub struct EphemeralFile {
3840 bytes_written : AtomicU64 ,
3941
4042 resource_units : std:: sync:: Mutex < GlobalResourceUnits > ,
43+
44+ /// A handle to the buffer that is currently being flushed.
45+ /// This allows to read from it without holding the buffered_writer lock.
46+ maybe_flushed_buffer : Arc < ArcSwap < Option < ( u64 , FullSlice < IoBuffer > ) > > > ,
4147}
4248
4349type BufferedWriter = owned_buffers_io:: write:: BufferedWriter <
@@ -93,24 +99,31 @@ impl EphemeralFile {
9399 gate. enter ( ) ?,
94100 ) ;
95101
102+ let writer = BufferedWriter :: new (
103+ file. clone ( ) ,
104+ 0 ,
105+ || IoBufferMut :: with_capacity ( TAIL_SZ ) ,
106+ gate. enter ( ) ?,
107+ cancel. clone ( ) ,
108+ ctx,
109+ tracing:: Span :: current ( ) ,
110+ ) ;
111+
112+ let maybe_flushed_buffer = writer. maybe_flushed_arc ( ) ;
113+
96114 let page_cache_file_id = page_cache:: next_file_id ( ) ; // XXX get rid, we're not page-caching anymore
97115
98116 Ok ( EphemeralFile {
99117 _tenant_shard_id : tenant_shard_id,
100118 _timeline_id : timeline_id,
101119 page_cache_file_id,
102- file : file. clone ( ) ,
103- buffered_writer : tokio:: sync:: RwLock :: new ( BufferedWriter :: new (
104- file,
105- 0 ,
106- || IoBufferMut :: with_capacity ( TAIL_SZ ) ,
107- gate. enter ( ) ?,
108- cancel. child_token ( ) ,
109- ctx,
110- info_span ! ( parent: None , "ephemeral_file_buffered_writer" , tenant_id=%tenant_shard_id. tenant_id, shard_id=%tenant_shard_id. shard_slug( ) , timeline_id=%timeline_id, path = %filename) ,
111- ) ) ,
120+ file,
121+ buffered_writer : tokio:: sync:: RwLock :: new ( writer) ,
122+ maybe_flushed_buffer,
112123 bytes_written : AtomicU64 :: new ( 0 ) ,
113- resource_units : std:: sync:: Mutex :: new ( GlobalResourceUnits :: new ( ) ) ,
124+ resource_units : std:: sync:: Mutex :: new (
125+ crate :: tenant:: storage_layer:: inmemory_layer:: GlobalResourceUnits :: new ( ) ,
126+ ) ,
114127 } )
115128 }
116129}
@@ -254,6 +267,26 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
254267 // TODO(vlad): Is there a nicer way of doing this?
255268 dst. as_mut_rust_slice_full_zeroed ( ) ;
256269
270+ let read_len = dst. bytes_total ( ) ;
271+ let end = start + read_len as u64 ;
272+
273+ // 1. Try lock-less read from maybe_flushed_buffer
274+ {
275+ let guard = self . maybe_flushed_buffer . load ( ) ;
276+ if let Some ( ( offset, buf) ) = guard. as_ref ( ) {
277+ let buf_start = * offset;
278+ let buf_end = buf_start + buf. len ( ) as u64 ;
279+
280+ // If the read is fully contained in the flushed buffer, serve it lock-lessly.
281+ if start >= buf_start && end <= buf_end {
282+ let src_start = ( start - buf_start) as usize ;
283+ let src_end = src_start + read_len;
284+ dst. as_mut_rust_slice_full_zeroed ( ) . copy_from_slice ( & buf[ src_start..src_end] ) ;
285+ return Ok ( ( dst, read_len) ) ;
286+ }
287+ }
288+ }
289+
257290 let writer = self . buffered_writer . read ( ) . await ;
258291
259292 // Read bytes written while under lock. This is a hack to deal with concurrent
@@ -276,7 +309,8 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
276309 } ;
277310
278311 let submitted_offset = writer. bytes_submitted ( ) ;
279- let maybe_flushed = writer. inspect_maybe_flushed ( ) ;
312+ let maybe_flushed_arc = writer. inspect_maybe_flushed ( ) . expect ( "always returns Some" ) ;
313+ let maybe_flushed = maybe_flushed_arc. as_ref ( ) ;
280314
281315 let mutable = match writer. inspect_mutable ( ) {
282316 Some ( mutable) => & mutable[ 0 ..mutable. pending ( ) ] ,
@@ -301,19 +335,26 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
301335 }
302336
303337 let ( written_range, maybe_flushed_range) = {
304- if maybe_flushed. is_some ( ) {
338+ if let Some ( ( offset, buf) ) = maybe_flushed {
339+ let buf_start = * offset;
340+ let buf_end = buf_start + buf. len ( ) as u64 ;
305341 // [ written ][ maybe_flushed ][ mutable ]
342+ // ^ ^
343+ // buf_start buf_end
306344 // ^
307345 // `submitted_offset`
308- // <++++++ on disk +++++++????????????????>
346+ //
347+ // Note: buf_end should equal submitted_offset if the flush loop is keeping up.
348+ // But maybe_flushed is a snapshot.
349+
309350 (
310351 Range (
311352 start,
312- std:: cmp:: min ( end, submitted_offset . saturating_sub ( TAIL_SZ as u64 ) ) ,
353+ std:: cmp:: min ( end, buf_start ) ,
313354 ) ,
314355 Range (
315- std:: cmp:: max ( start, submitted_offset . saturating_sub ( TAIL_SZ as u64 ) ) ,
316- std:: cmp:: min ( end, submitted_offset ) ,
356+ std:: cmp:: max ( start, buf_start ) ,
357+ std:: cmp:: min ( end, buf_end ) ,
317358 ) ,
318359 )
319360 } else {
@@ -363,14 +404,16 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
363404 } ;
364405
365406 let dst = if maybe_flushed_range. len ( ) > 0 {
407+ let ( offset, buf) = maybe_flushed. as_ref ( ) . unwrap ( ) ;
408+ let buf_start = * offset;
409+
366410 let offset_in_buffer = maybe_flushed_range
367411 . 0
368- . checked_sub ( submitted_offset . saturating_sub ( TAIL_SZ as u64 ) )
412+ . checked_sub ( buf_start )
369413 . unwrap ( )
370414 . into_usize ( ) ;
371- // Checked previously the buffer is Some.
372- let maybe_flushed = maybe_flushed. unwrap ( ) ;
373- let to_copy = & maybe_flushed
415+
416+ let to_copy = & buf
374417 [ offset_in_buffer..( offset_in_buffer + maybe_flushed_range. len ( ) . into_usize ( ) ) ] ;
375418 let bounds = dst. bounds ( ) ;
376419 let mut view = dst. slice ( {
@@ -544,7 +587,7 @@ mod tests {
544587
545588 let writer = file. buffered_writer . read ( ) . await ;
546589 let maybe_flushed_buffer_contents = writer. inspect_maybe_flushed ( ) . unwrap ( ) ;
547- assert_eq ! ( & maybe_flushed_buffer_contents[ ..] , & content[ cap..cap * 2 ] ) ;
590+ assert_eq ! ( & maybe_flushed_buffer_contents. as_ref ( ) . as_ref ( ) . unwrap ( ) . 1 [ ..] , & content[ cap..cap * 2 ] ) ;
548591
549592 let mutable_buffer_contents = writer. mutable ( ) ;
550593 assert_eq ! ( mutable_buffer_contents, & content[ cap * 2 ..write_nbytes] ) ;
@@ -583,7 +626,7 @@ mod tests {
583626 ) ;
584627 let writer = file. buffered_writer . read ( ) . await ;
585628 assert_eq ! (
586- & writer. inspect_maybe_flushed( ) . unwrap( ) [ 0 ..cap] ,
629+ & writer. inspect_maybe_flushed( ) . unwrap( ) . as_ref ( ) . as_ref ( ) . unwrap ( ) . 1 [ 0 ..cap] ,
587630 & content[ cap..cap * 2 ]
588631 ) ;
589632 assert_eq ! (
@@ -592,6 +635,92 @@ mod tests {
592635 ) ;
593636 }
594637
638+ #[ tokio:: test]
639+ async fn test_concurrent_lockless_read ( ) {
640+ let ( conf, tenant_id, timeline_id, ctx) =
641+ harness ( "test_concurrent_lockless_read" ) . unwrap ( ) ;
642+
643+ let gate = utils:: sync:: gate:: Gate :: default ( ) ;
644+ let cancel = CancellationToken :: new ( ) ;
645+ let file = Arc :: new (
646+ EphemeralFile :: create ( conf, tenant_id, timeline_id, & gate, & cancel, & ctx)
647+ . await
648+ . unwrap ( ) ,
649+ ) ;
650+
651+ let n_readers = 10 ;
652+ let n_writes = 100 ;
653+ let chunk_size = 1024 ;
654+
655+ let mut handles = Vec :: new ( ) ;
656+
657+ // Spawn writer
658+ let file_writer = file. clone ( ) ;
659+ let ctx_writer = ctx. attached_child ( ) ;
660+ handles. push ( tokio:: spawn ( async move {
661+ use rand:: SeedableRng ;
662+ let mut rng = rand:: rngs:: StdRng :: from_os_rng ( ) ;
663+ for i in 0 ..n_writes {
664+ let mut chunk = vec ! [ 0u8 ; chunk_size] ;
665+ rng. fill ( & mut chunk[ ..] ) ;
666+ // Fill with predictable data for verification
667+ for j in 0 ..chunk_size {
668+ chunk[ j] = ( i as u8 ) . wrapping_add ( j as u8 ) ;
669+ }
670+ file_writer. write_raw ( & chunk, & ctx_writer) . await . unwrap ( ) ;
671+ tokio:: task:: yield_now ( ) . await ;
672+ }
673+ } ) ) ;
674+
675+ // Spawn readers
676+ for reader_idx in 0 ..n_readers {
677+ let file_reader = file. clone ( ) ;
678+ let ctx_reader = ctx. attached_child ( ) ;
679+ handles. push ( tokio:: spawn ( async move {
680+ use rand:: SeedableRng ;
681+ let mut rng = rand:: rngs:: StdRng :: from_os_rng ( ) ;
682+ for _ in 0 ..n_writes * 2 {
683+ let len = file_reader. len ( ) ;
684+ if len == 0 {
685+ tokio:: task:: yield_now ( ) . await ;
686+ continue ;
687+ }
688+
689+ let read_len = rng. random_range ( 1 ..chunk_size * 2 ) ;
690+ let start = rng. random_range ( 0 ..len) ;
691+ let end = std:: cmp:: min ( start + read_len as u64 , len) ;
692+ let actual_read_len = ( end - start) as usize ;
693+
694+ if actual_read_len == 0 {
695+ continue ;
696+ }
697+
698+ let mut buf = IoBufferMut :: with_capacity ( actual_read_len) ;
699+ let ( buf_slice, n) = file_reader
700+ . read_exact_at_eof_ok ( start, buf. slice_full ( ) , & ctx_reader)
701+ . await
702+ . unwrap ( ) ;
703+ assert_eq ! ( n, actual_read_len) ;
704+
705+ let buf = buf_slice. into_inner ( ) ;
706+
707+ for k in 0 ..actual_read_len {
708+ let absolute_pos = start + k as u64 ;
709+ let chunk_idx = ( absolute_pos / chunk_size as u64 ) as usize ;
710+ let byte_idx = ( absolute_pos % chunk_size as u64 ) as usize ;
711+ let expected = ( chunk_idx as u8 ) . wrapping_add ( byte_idx as u8 ) ;
712+ assert_eq ! ( buf[ k] , expected, "Reader {} failed at pos {}" , reader_idx, absolute_pos) ;
713+ }
714+
715+ tokio:: task:: yield_now ( ) . await ;
716+ }
717+ } ) ) ;
718+ }
719+
720+ for handle in handles {
721+ handle. await . unwrap ( ) ;
722+ }
723+ }
595724 #[ tokio:: test]
596725 async fn test_read_split_across_file_and_buffer ( ) {
597726 // This test exercises the logic on the read path that splits the logical read
@@ -673,4 +802,101 @@ mod tests {
673802 in_progress. wait_until_flush_is_done ( ) . await ;
674803 test_read_all_offset_combinations ( ) . await ;
675804 }
676- }
805+
806+ #[ tokio:: test]
807+ async fn test_multi_writer_race_condition ( ) {
808+ let ( conf, tenant_id, timeline_id, ctx) =
809+ harness ( "test_multi_writer_race_condition" ) . unwrap ( ) ;
810+
811+ let gate = utils:: sync:: gate:: Gate :: default ( ) ;
812+ let cancel = CancellationToken :: new ( ) ;
813+ let file = Arc :: new (
814+ EphemeralFile :: create ( conf, tenant_id, timeline_id, & gate, & cancel, & ctx)
815+ . await
816+ . unwrap ( ) ,
817+ ) ;
818+
819+ let n_writers = 10 ;
820+ let n_iter = 100 ;
821+ let chunk_size = 1024 ;
822+
823+ let mut handles = Vec :: new ( ) ;
824+ for _ in 0 ..n_writers {
825+ let file = file. clone ( ) ;
826+ let ctx = ctx. attached_child ( ) ;
827+ handles. push ( tokio:: spawn ( async move {
828+ use rand:: SeedableRng ;
829+ let mut rng = rand:: rngs:: StdRng :: from_os_rng ( ) ;
830+ for _ in 0 ..n_iter {
831+ let mut chunk = vec ! [ 0u8 ; chunk_size] ;
832+ rng. fill ( & mut chunk[ ..] ) ;
833+ file. write_raw ( & chunk, & ctx) . await . unwrap ( ) ;
834+ }
835+ } ) ) ;
836+ }
837+
838+ for handle in handles {
839+ handle. await . unwrap ( ) ;
840+ }
841+
842+ let expected_len = ( n_writers * n_iter * chunk_size) as u64 ;
843+ assert_eq ! ( file. len( ) , expected_len) ;
844+ }
845+
846+ #[ tokio:: test]
847+ async fn test_concurrent_flush_no_stall ( ) {
848+ use std:: time:: { Duration , Instant } ;
849+ use crate :: context:: DownloadBehavior ;
850+ use crate :: task_mgr:: TaskKind ;
851+
852+ let ( conf, tenant_id, timeline_id, ctx) = harness ( "test_concurrent_flush_no_stall" ) . unwrap ( ) ;
853+ let gate = utils:: sync:: gate:: Gate :: default ( ) ;
854+ let cancel = CancellationToken :: new ( ) ;
855+
856+ let file = Arc :: new (
857+ EphemeralFile :: create ( conf, tenant_id, timeline_id, & gate, & cancel, & ctx)
858+ . await
859+ . unwrap ( ) ,
860+ ) ;
861+
862+ let writer_file = file. clone ( ) ;
863+ let writer_handle = tokio:: spawn ( async move {
864+ let writer_ctx = RequestContext :: new ( TaskKind :: UnitTest , DownloadBehavior :: Error ) ;
865+ let chunk = vec ! [ 0u8 ; TAIL_SZ ] ;
866+ for _ in 0 ..50 {
867+ writer_file. write_raw ( & chunk, & writer_ctx) . await . unwrap ( ) ;
868+ tokio:: task:: yield_now ( ) . await ;
869+ }
870+ } ) ;
871+
872+ let mut reader_handles = vec ! [ ] ;
873+ for i in 0 ..5 {
874+ let reader_file = file. clone ( ) ;
875+ reader_handles. push ( tokio:: spawn ( async move {
876+ let reader_ctx = RequestContext :: new ( TaskKind :: UnitTest , DownloadBehavior :: Error ) ;
877+ let start = Instant :: now ( ) ;
878+ let mut reads = 0 ;
879+ while start. elapsed ( ) < Duration :: from_secs ( 1 ) {
880+ let mut buf = IoBufferMut :: with_capacity ( 1024 ) ;
881+ if reader_file
882+ . read_exact_at_eof_ok ( 0 , buf. slice_full ( ) , & reader_ctx)
883+ . await
884+ . is_ok ( )
885+ {
886+ reads += 1 ;
887+ }
888+ }
889+ println ! ( "Reader {} completed {} reads" , i, reads) ;
890+ reads
891+ } ) ) ;
892+ }
893+
894+ writer_handle. await . unwrap ( ) ;
895+ let mut total_reads = 0 ;
896+ for h in reader_handles {
897+ total_reads += h. await . unwrap ( ) ;
898+ }
899+
900+ assert ! ( total_reads > 100 , "Readers were blocked by the flush lock!" ) ;
901+ }
902+ }
0 commit comments