Skip to content

Commit 9de32da

Browse files
author
umi
committed
paraReadBytes2
1 parent ff8d266 commit 9de32da

1 file changed

Lines changed: 50 additions & 44 deletions

File tree

crates/paimon/src/arrow/format/parquet.rs

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -828,8 +828,8 @@ impl AsyncFileReader for ArrowFileReader {
828828
}
829829

830830
fn get_byte_ranges(
831-
&mut self,
832-
ranges: Vec<Range<u64>>,
831+
&mut self,
832+
ranges: Vec<Range<u64>>,
833833
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
834834
let coalesce_bytes = self.range_coalesce_bytes;
835835
let concurrency = self.range_fetch_concurrency.max(1);
@@ -839,22 +839,27 @@ impl AsyncFileReader for ArrowFileReader {
839839
return Ok(vec![]);
840840
}
841841

842-
// Merge nearby ranges to reduce the number of object-store requests.
843-
let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes);
844-
let r = &self.r;
842+
// Calculate max merged range size to ensure enough ranges for concurrency.
843+
// For column-pruned reads, ranges are naturally spread out so this has no effect.
844+
// For full-table reads, this prevents everything from merging into 1 huge range.
845+
let total_bytes: u64 = ranges.iter().map(|r| r.end - r.start).sum();
846+
let max_merge_bytes = if concurrency > 1 {
847+
(total_bytes / concurrency as u64).max(1)
848+
} else {
849+
u64::MAX
850+
};
845851

846-
eprintln!(
847-
"[get_byte_ranges] original={}, merged={}",
848-
ranges.len(),
849-
fetch_ranges.len()
850-
);
852+
let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes, max_merge_bytes);
851853

852854
// Fetch merged ranges concurrently.
855+
// NOTE: requires FileRead to be Sync. If FileRead is !Sync, either
856+
// add Sync bound or fall back to the sequential loop below.
857+
let r = &self.r;
853858
let fetched: Vec<Bytes> = futures::stream::iter(fetch_ranges.iter().cloned())
854859
.map(|range| async move {
855860
r.read(range)
856861
.await
857-
.map_err(|e| parquet::errors::ParquetError::External(Box::new(e)))
862+
.map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into()))
858863
})
859864
.buffered(concurrency)
860865
.try_collect()
@@ -902,39 +907,40 @@ impl AsyncFileReader for ArrowFileReader {
902907
///
903908
/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range.
904909
/// The input does not need to be sorted.
905-
fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
906-
if ranges.is_empty() {
907-
return vec![];
908-
}
909-
910-
let mut sorted = ranges.to_vec();
911-
sorted.sort_unstable_by_key(|r| r.start);
912-
913-
let mut merged = Vec::with_capacity(sorted.len());
914-
let mut start_idx = 0;
915-
let mut end_idx = 1;
916-
917-
while start_idx != sorted.len() {
918-
let mut range_end = sorted[start_idx].end;
919-
920-
while end_idx != sorted.len()
921-
&& sorted[end_idx]
922-
.start
923-
.checked_sub(range_end)
924-
.map(|delta| delta <= coalesce)
925-
.unwrap_or(true)
926-
{
927-
range_end = range_end.max(sorted[end_idx].end);
928-
end_idx += 1;
929-
}
930-
931-
merged.push(sorted[start_idx].start..range_end);
932-
start_idx = end_idx;
933-
end_idx += 1;
934-
}
935-
936-
merged
937-
}
910+
fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64, max_merge_bytes: u64) -> Vec<Range<u64>> {
911+
if ranges.is_empty() {
912+
return vec![];
913+
}
914+
915+
let mut sorted = ranges.to_vec();
916+
sorted.sort_unstable_by_key(|r| r.start);
917+
918+
let mut merged = Vec::with_capacity(sorted.len());
919+
let mut start_idx = 0;
920+
let mut end_idx = 1;
921+
922+
while start_idx != sorted.len() {
923+
let mut range_end = sorted[start_idx].end;
924+
925+
while end_idx != sorted.len()
926+
&& sorted[end_idx]
927+
.start
928+
.checked_sub(range_end)
929+
.map(|delta| delta <= coalesce)
930+
.unwrap_or(true)
931+
&& (sorted[end_idx].end - sorted[start_idx].start) <= max_merge_bytes
932+
{
933+
range_end = range_end.max(sorted[end_idx].end);
934+
end_idx += 1;
935+
}
936+
937+
merged.push(sorted[start_idx].start..range_end);
938+
start_idx = end_idx;
939+
end_idx += 1;
940+
}
941+
942+
merged
943+
}
938944

939945
// ---------------------------------------------------------------------------
940946
// Tests

0 commit comments

Comments
 (0)