Skip to content

Commit 69de7f4

Browse files
author
umi
committed
fix
1 parent 1a89cba commit 69de7f4

1 file changed

Lines changed: 43 additions & 31 deletions

File tree

crates/paimon/src/arrow/format/parquet.rs

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,11 @@ const RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
791791
const RANGE_FETCH_CONCURRENCY: usize = 8;
792792
/// Default metadata prefetch hint: 512 KiB.
793793
const METADATA_SIZE_HINT: usize = 512 * 1024;
794+
/// Minimum range size for splitting: 4 MiB.
795+
/// Matches Java Paimon's `batchSizeForVectorReads` default.
796+
/// Ranges smaller than this will not be split further to avoid
797+
/// excessive small IO requests whose per-request overhead dominates.
798+
const MIN_SPLIT_SIZE: u64 = 4 * 1024 * 1024;
794799

795800
impl ArrowFileReader {
796801
fn new(file_size: u64, r: Box<dyn FileRead>) -> Self {
@@ -858,17 +863,36 @@ impl AsyncFileReader for ArrowFileReader {
858863
};
859864

860865
// Slice the fetched data back into the originally requested ranges.
861-
Ok(ranges
866+
let result: parquet::errors::Result<Vec<Bytes>> = ranges
862867
.iter()
863868
.map(|range| {
864-
let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
869+
let pp = fetch_ranges.partition_point(|v| v.start <= range.start);
870+
let idx = pp.checked_sub(1).ok_or_else(|| {
871+
parquet::errors::ParquetError::General(format!(
872+
"No fetch range covers requested range {}..{}",
873+
range.start, range.end
874+
))
875+
})?;
865876
let fetch_range = &fetch_ranges[idx];
866877
let fetch_bytes = &fetched[idx];
867878
let start = (range.start - fetch_range.start) as usize;
868879
let end = (range.end - fetch_range.start) as usize;
869-
fetch_bytes.slice(start..end.min(fetch_bytes.len()))
880+
if end > fetch_bytes.len() {
881+
return Err(parquet::errors::ParquetError::General(format!(
882+
"Fetched data too short for range {}..{}: \
883+
expected at least {} bytes from fetch range {}..{}, got {}",
884+
range.start,
885+
range.end,
886+
end,
887+
fetch_range.start,
888+
fetch_range.end,
889+
fetch_bytes.len()
890+
)));
891+
}
892+
Ok(fetch_bytes.slice(start..end))
870893
})
871-
.collect())
894+
.collect();
895+
result
872896
}
873897
.boxed()
874898
}
@@ -954,24 +978,35 @@ fn split_ranges_for_concurrency(
954978

955979
while result.len() < target_count {
956980
// Pick the largest range.
957-
let (idx, _) = result
981+
let (idx, largest) = result
958982
.iter()
959983
.enumerate()
960984
.max_by_key(|(_, r)| r.end - r.start)
961985
.unwrap();
962986

987+
let largest_size = largest.end - largest.start;
988+
989+
// Don't split if the range is smaller than 2 * MIN_SPLIT_SIZE,
990+
// because both halves would end up below the batch threshold.
991+
if largest_size < MIN_SPLIT_SIZE * 2 {
992+
break;
993+
}
994+
963995
let range = &result[idx];
996+
// Each half must be at least MIN_SPLIT_SIZE.
997+
let expected_size = MIN_SPLIT_SIZE.max(largest_size / target_count as u64 + 1);
964998
let mid = range.start + (range.end - range.start) / 2;
965999

966-
// Find the boundary closest to the midpoint that actually splits.
9671000
let best = boundaries
9681001
.iter()
9691002
.copied()
970-
.filter(|&b| b > range.start && b < range.end)
1003+
.filter(|&b| {
1004+
b >= range.start + expected_size && b <= range.end.saturating_sub(expected_size)
1005+
})
9711006
.min_by_key(|&b| (b as i64 - mid as i64).unsigned_abs());
9721007

9731008
let Some(split_at) = best else {
974-
break; // No valid split point in the largest range; stop.
1009+
break; // No valid split point that keeps both halves large enough.
9751010
};
9761011

9771012
let left = range.start..split_at;
@@ -1089,29 +1124,6 @@ mod tests {
10891124
assert_eq!(result[0], 0..1000);
10901125
}
10911126

1092-
#[test]
1093-
fn test_split_single_range_multiple_originals() {
1094-
// One merged range containing 4 originals — bisect at boundaries.
1095-
let original = vec![0..200, 250..500, 550..750, 800..1000];
1096-
#[allow(clippy::single_range_in_vec_init)]
1097-
let merged = vec![0..1000];
1098-
let result = super::split_ranges_for_concurrency(merged, &original, 4);
1099-
assert_eq!(result.len(), 4);
1100-
assert_eq!(result[0].start, 0);
1101-
assert_eq!(result.last().unwrap().end, 1000);
1102-
for window in result.windows(2) {
1103-
assert_eq!(window[0].end, window[1].start);
1104-
}
1105-
for orig in &original {
1106-
assert!(
1107-
result
1108-
.iter()
1109-
.any(|r| r.start <= orig.start && r.end >= orig.end),
1110-
"original {orig:?} not fully contained"
1111-
);
1112-
}
1113-
}
1114-
11151127
#[test]
11161128
fn test_split_mixed_sizes() {
11171129
let original = vec![0..300, 400..700, 800..1000, 2000..2010];

0 commit comments

Comments
 (0)