Skip to content

Commit 66b8fa8

Browse files
author
umi
committed
fixSplit
1 parent 0149510 commit 66b8fa8

1 file changed

Lines changed: 85 additions & 95 deletions

File tree

crates/paimon/src/arrow/format/parquet.rs

Lines changed: 85 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -861,34 +861,71 @@ impl AsyncFileReader for ArrowFileReader {
861861
.await?
862862
};
863863

864-
// Slice the fetched data back into the originally requested ranges.
864+
// Slice the fetched data back into the originally requested
865+
// ranges. A single original range may span multiple fetch
866+
// chunks (the Java `copyMultiBytesToBytes` approach), so we
867+
// copy from as many chunks as needed.
865868
let result: parquet::errors::Result<Vec<Bytes>> = ranges
866869
.iter()
867870
.map(|range| {
868-
let pp = fetch_ranges.partition_point(|v| v.start <= range.start);
869-
let idx = pp.checked_sub(1).ok_or_else(|| {
870-
parquet::errors::ParquetError::General(format!(
871+
// Find the first fetch chunk whose end is past range.start.
872+
let first = fetch_ranges.partition_point(|v| v.end <= range.start);
873+
if first >= fetch_ranges.len() {
874+
return Err(parquet::errors::ParquetError::General(format!(
871875
"No fetch range covers requested range {}..{}",
872876
range.start, range.end
873-
))
874-
})?;
875-
let fetch_range = &fetch_ranges[idx];
876-
let fetch_bytes = &fetched[idx];
877-
let start = (range.start - fetch_range.start) as usize;
878-
let end = (range.end - fetch_range.start) as usize;
879-
if end > fetch_bytes.len() {
877+
)));
878+
}
879+
880+
let need = (range.end - range.start) as usize;
881+
882+
// Fast path: the original range fits entirely within one
883+
// fetch chunk — zero-copy slice.
884+
let fr = &fetch_ranges[first];
885+
if range.end <= fr.end {
886+
let start = (range.start - fr.start) as usize;
887+
let end = (range.end - fr.start) as usize;
888+
return Ok(fetched[first].slice(start..end));
889+
}
890+
891+
// Slow path: the original range spans multiple fetch
892+
// chunks — copy pieces into a new buffer (mirrors Java's
893+
// copyMultiBytesToBytes).
894+
let mut buf = Vec::with_capacity(need);
895+
let mut pos = range.start;
896+
for i in first..fetch_ranges.len() {
897+
if pos >= range.end {
898+
break;
899+
}
900+
let fr = &fetch_ranges[i];
901+
let chunk = &fetched[i];
902+
let src_start = (pos - fr.start) as usize;
903+
let src_end = ((range.end.min(fr.end)) - fr.start) as usize;
904+
if src_end > chunk.len() {
905+
return Err(parquet::errors::ParquetError::General(format!(
906+
"Fetched data too short for range {}..{}: \
907+
chunk {}..{} has {} bytes, need up to offset {}",
908+
range.start,
909+
range.end,
910+
fr.start,
911+
fr.end,
912+
chunk.len(),
913+
src_end,
914+
)));
915+
}
916+
buf.extend_from_slice(&chunk[src_start..src_end]);
917+
pos = fr.end;
918+
}
919+
if buf.len() != need {
880920
return Err(parquet::errors::ParquetError::General(format!(
881-
"Fetched data too short for range {}..{}: \
882-
expected at least {} bytes from fetch range {}..{}, got {}",
921+
"Assembled {} bytes for range {}..{}, expected {}",
922+
buf.len(),
883923
range.start,
884924
range.end,
885-
end,
886-
fetch_range.start,
887-
fetch_range.end,
888-
fetch_bytes.len()
925+
need,
889926
)));
890927
}
891-
Ok(fetch_bytes.slice(start..end))
928+
Ok(Bytes::from(buf))
892929
})
893930
.collect();
894931
result
@@ -956,62 +993,47 @@ fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
956993
merged
957994
}
958995

959-
/// Split merged ranges to utilize concurrency by repeatedly bisecting the
960-
/// largest range at the nearest original-range boundary. This guarantees
961-
/// every original range stays fully inside one fetch range.
996+
/// Split merged ranges into fixed-size batches to utilize concurrency,
997+
/// Each merged range is divided into chunks of `expected_size`,
998+
/// with the last chunk taking whatever remains.
999+
/// Ranges smaller than `2 * MIN_SPLIT_SIZE` are kept as-is to
1000+
/// avoid excessive small IO requests.
9621001
fn split_ranges_for_concurrency(
9631002
merged: Vec<Range<u64>>,
964-
original: &[Range<u64>],
1003+
_original: &[Range<u64>],
9651004
target_count: usize,
9661005
) -> Vec<Range<u64>> {
967-
if merged.is_empty() || target_count <= 1 || merged.len() >= target_count {
1006+
if merged.is_empty() || target_count <= 1 {
9681007
return merged;
9691008
}
9701009

971-
// Collect all original-range start points as candidate split boundaries.
972-
let mut boundaries: Vec<u64> = original.iter().map(|r| r.start).collect();
973-
boundaries.sort_unstable();
974-
boundaries.dedup();
975-
976-
let mut result = merged;
977-
978-
while result.len() < target_count {
979-
// Pick the largest range.
980-
let (idx, largest) = result
981-
.iter()
982-
.enumerate()
983-
.max_by_key(|(_, r)| r.end - r.start)
984-
.unwrap();
1010+
let mut result = Vec::with_capacity(merged.len());
9851011

986-
let largest_size = largest.end - largest.start;
1012+
for range in &merged {
1013+
let length = range.end - range.start;
9871014

988-
// Don't split if the range is smaller than 2 * MIN_SPLIT_SIZE,
989-
// because both halves would end up below the batch threshold.
990-
if largest_size < MIN_SPLIT_SIZE * 2 {
991-
break;
1015+
if length < MIN_SPLIT_SIZE * 2 {
1016+
result.push(range.clone());
1017+
continue;
9921018
}
9931019

994-
let range = &result[idx];
995-
// Each half must be at least MIN_SPLIT_SIZE.
996-
let expected_size = MIN_SPLIT_SIZE.max(largest_size / target_count as u64 + 1);
997-
let mid = range.start + (range.end - range.start) / 2;
1020+
let expected_size = MIN_SPLIT_SIZE.max(length / target_count as u64 + 1);
1021+
let min_remain = expected_size.max(MIN_SPLIT_SIZE * 2);
9981022

999-
let best = boundaries
1000-
.iter()
1001-
.copied()
1002-
.filter(|&b| {
1003-
b >= range.start + expected_size && b <= range.end.saturating_sub(expected_size)
1004-
})
1005-
.min_by_key(|&b| (b as i64 - mid as i64).unsigned_abs());
1023+
let mut offset = range.start;
1024+
let end = range.end;
10061025

1007-
let Some(split_at) = best else {
1008-
break; // No valid split point that keeps both halves large enough.
1009-
};
1010-
1011-
let left = range.start..split_at;
1012-
let right = split_at..range.end;
1013-
result[idx] = left;
1014-
result.insert(idx + 1, right);
1026+
loop {
1027+
if offset + min_remain > end {
1028+
if offset < end {
1029+
result.push(offset..end);
1030+
}
1031+
break;
1032+
} else {
1033+
result.push(offset..offset + expected_size);
1034+
offset += expected_size;
1035+
}
1036+
}
10151037
}
10161038

10171039
result
@@ -1112,8 +1134,8 @@ mod tests {
11121134
// -----------------------------------------------------------------------
11131135

11141136
#[test]
1115-
fn test_split_single_range() {
1116-
// One merged range from a single original — no boundary to split at.
1137+
fn test_split_single_small_range() {
1138+
// A single range smaller than 2 * MIN_SPLIT_SIZE should not be split.
11171139
#[allow(clippy::single_range_in_vec_init)]
11181140
let merged = vec![0..1000];
11191141
#[allow(clippy::single_range_in_vec_init)]
@@ -1123,21 +1145,6 @@ mod tests {
11231145
assert_eq!(result[0], 0..1000);
11241146
}
11251147

1126-
#[test]
1127-
fn test_split_mixed_sizes() {
1128-
let original = vec![0..300, 400..700, 800..1000, 2000..2010];
1129-
let merged = vec![0..1000, 2000..2010];
1130-
let result = super::split_ranges_for_concurrency(merged, &original, 4);
1131-
assert!(result.contains(&(2000..2010)));
1132-
for orig in &original {
1133-
assert!(
1134-
result
1135-
.iter()
1136-
.any(|r| r.start <= orig.start && r.end >= orig.end),
1137-
"original {orig:?} not fully contained"
1138-
);
1139-
}
1140-
}
11411148

11421149
#[test]
11431150
fn test_split_empty() {
@@ -1146,21 +1153,4 @@ mod tests {
11461153
let result = super::split_ranges_for_concurrency(merged, &original, 4);
11471154
assert!(result.is_empty());
11481155
}
1149-
1150-
#[test]
1151-
fn test_split_clustered_and_sparse() {
1152-
let original = vec![0..100, 150..250, 300..400, 1000..1010, 2000..2010];
1153-
let merged = vec![0..400, 1000..1010, 2000..2010];
1154-
let result = super::split_ranges_for_concurrency(merged, &original, 5);
1155-
assert!(result.contains(&(1000..1010)));
1156-
assert!(result.contains(&(2000..2010)));
1157-
for orig in &original {
1158-
assert!(
1159-
result
1160-
.iter()
1161-
.any(|r| r.start <= orig.start && r.end >= orig.end),
1162-
"original {orig:?} not fully contained"
1163-
);
1164-
}
1165-
}
11661156
}

0 commit comments

Comments
 (0)