Skip to content

Commit ba47ac9

Browse files
author
umi
committed
fix
1 parent 9de32da commit ba47ac9

1 file changed

Lines changed: 20 additions & 11 deletions

File tree

crates/paimon/src/arrow/format/parquet.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -852,18 +852,27 @@ impl AsyncFileReader for ArrowFileReader {
852852
let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes, max_merge_bytes);
853853

854854
// Fetch merged ranges concurrently.
855-
// NOTE: requires FileRead to be Sync. If FileRead is !Sync, either
856-
// add Sync bound or fall back to the sequential loop below.
857855
let r = &self.r;
858-
let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
859-
.map(|range| async move {
860-
r.read(range)
861-
.await
862-
.map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into()))
863-
})
864-
.buffered(concurrency)
865-
.try_collect()
866-
.await?;
856+
let fetched: Vec<Bytes> = if fetch_ranges.len() <= concurrency {
857+
// All ranges fit within the concurrency limit — fire them all at once.
858+
futures::future::try_join_all(fetch_ranges.iter().map(|range| {
859+
r.read(range.clone()).map_err(|e| {
860+
parquet::errors::ParquetError::External(format!("{e}").into())
861+
})
862+
}))
863+
.await?
864+
} else {
865+
// More ranges than concurrency slots — use buffered stream.
866+
futures::stream::iter(fetch_ranges.iter().cloned())
867+
.map(|range| async move {
868+
r.read(range).await.map_err(|e| {
869+
parquet::errors::ParquetError::External(format!("{e}").into())
870+
})
871+
})
872+
.buffered(concurrency)
873+
.try_collect()
874+
.await?
875+
};
867876

868877
// Slice the fetched data back into the originally requested ranges.
869878
Ok(ranges

0 commit comments

Comments
 (0)