Skip to content

Commit e160e7a

Browse files
author
umi
committed
fix
1 parent 3f10d81 commit e160e7a

1 file changed

Lines changed: 85 additions & 36 deletions

File tree

crates/paimon/src/arrow/format/parquet.rs

Lines changed: 85 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -846,8 +846,9 @@ impl AsyncFileReader for ArrowFileReader {
846846
// Two-phase range optimization:
847847
// Phase 1: Merge nearby ranges based on coalesce threshold.
848848
let coalesced = merge_byte_ranges(&ranges, coalesce_bytes);
849-
// Phase 2: Split large merged ranges to utilize concurrency.
850-
let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency);
849+
// Phase 2: Split large merged ranges to utilize concurrency,
850+
// but only at original range boundaries.
851+
let fetch_ranges = split_ranges_for_concurrency(coalesced, &ranges, concurrency);
851852

852853
// Fetch merged ranges concurrently.
853854
let r = &self.r;
@@ -947,32 +948,51 @@ fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
947948
merged
948949
}
949950

950-
fn split_ranges_for_concurrency(ranges: Vec<Range<u64>>, target_count: usize) -> Vec<Range<u64>> {
951-
if ranges.is_empty() || target_count <= 1 || ranges.len() >= target_count {
952-
return ranges;
951+
/// Split merged ranges to utilize concurrency by repeatedly bisecting the
952+
/// largest range at the nearest original-range boundary. This guarantees
953+
/// every original range stays fully inside one fetch range.
954+
fn split_ranges_for_concurrency(
955+
merged: Vec<Range<u64>>,
956+
original: &[Range<u64>],
957+
target_count: usize,
958+
) -> Vec<Range<u64>> {
959+
if merged.is_empty() || target_count <= 1 || merged.len() >= target_count {
960+
return merged;
953961
}
954962

955-
let mut result = ranges;
963+
// Collect all original-range start points as candidate split boundaries.
964+
let mut boundaries: Vec<u64> = original.iter().map(|r| r.start).collect();
965+
boundaries.sort_unstable();
966+
boundaries.dedup();
967+
968+
let mut result = merged;
956969

957970
while result.len() < target_count {
958-
// Find the largest range by byte size.
959-
let (largest_idx, largest_range) = result
971+
// Pick the largest range.
972+
let (idx, _) = result
960973
.iter()
961974
.enumerate()
962975
.max_by_key(|(_, r)| r.end - r.start)
963-
.expect("result is non-empty");
976+
.unwrap();
964977

965-
let range_size = largest_range.end - largest_range.start;
966-
if range_size <= 1 {
967-
break;
968-
}
978+
let range = &result[idx];
979+
let mid = range.start + (range.end - range.start) / 2;
980+
981+
// Find the boundary closest to the midpoint that actually splits.
982+
let best = boundaries
983+
.iter()
984+
.copied()
985+
.filter(|&b| b > range.start && b < range.end)
986+
.min_by_key(|&b| (b as i64 - mid as i64).unsigned_abs());
969987

970-
let mid = largest_range.start + range_size / 2;
971-
let left = largest_range.start..mid;
972-
let right = mid..largest_range.end;
988+
let Some(split_at) = best else {
989+
break; // No valid split point in the largest range; stop.
990+
};
973991

974-
result[largest_idx] = left;
975-
result.insert(largest_idx + 1, right);
992+
let left = range.start..split_at;
993+
let right = split_at..range.end;
994+
result[idx] = left;
995+
result.insert(idx + 1, right);
976996
}
977997

978998
result
@@ -1074,46 +1094,75 @@ mod tests {
10741094

10751095
#[test]
10761096
fn test_split_single_range() {
1077-
// One large range split into 4
1097+
// One merged range from a single original — no boundary to split at.
10781098
#[allow(clippy::single_range_in_vec_init)]
1079-
let ranges = vec![0..1000];
1080-
let result = super::split_ranges_for_concurrency(ranges, 4);
1099+
let merged = vec![0..1000];
1100+
let original = vec![0..1000];
1101+
let result = super::split_ranges_for_concurrency(merged, &original, 4);
1102+
assert_eq!(result.len(), 1);
1103+
assert_eq!(result[0], 0..1000);
1104+
}
1105+
1106+
#[test]
1107+
fn test_split_single_range_multiple_originals() {
1108+
// One merged range containing 4 originals — bisect at boundaries.
1109+
let original = vec![0..200, 250..500, 550..750, 800..1000];
1110+
let merged = vec![0..1000];
1111+
let result = super::split_ranges_for_concurrency(merged, &original, 4);
10811112
assert_eq!(result.len(), 4);
1082-
// All ranges should be contiguous and cover 0..1000
10831113
assert_eq!(result[0].start, 0);
10841114
assert_eq!(result.last().unwrap().end, 1000);
10851115
for window in result.windows(2) {
10861116
assert_eq!(window[0].end, window[1].start);
10871117
}
1118+
for orig in &original {
1119+
assert!(
1120+
result
1121+
.iter()
1122+
.any(|r| r.start <= orig.start && r.end >= orig.end),
1123+
"original {orig:?} not fully contained"
1124+
);
1125+
}
10881126
}
10891127

10901128
#[test]
10911129
fn test_split_mixed_sizes() {
1092-
// One large range + one small range, target=4
1093-
// Should split the large range, leave the small one alone
1094-
let ranges = vec![0..1000, 2000..2010];
1095-
let result = super::split_ranges_for_concurrency(ranges, 4);
1096-
assert_eq!(result.len(), 4);
1097-
// The small range (2000..2010) should remain intact
1130+
let original = vec![0..300, 400..700, 800..1000, 2000..2010];
1131+
let merged = vec![0..1000, 2000..2010];
1132+
let result = super::split_ranges_for_concurrency(merged, &original, 4);
10981133
assert!(result.contains(&(2000..2010)));
1134+
for orig in &original {
1135+
assert!(
1136+
result
1137+
.iter()
1138+
.any(|r| r.start <= orig.start && r.end >= orig.end),
1139+
"original {orig:?} not fully contained"
1140+
);
1141+
}
10991142
}
11001143

11011144
#[test]
11021145
fn test_split_empty() {
1103-
let ranges: Vec<std::ops::Range<u64>> = vec![];
1104-
let result = super::split_ranges_for_concurrency(ranges, 4);
1146+
let merged: Vec<std::ops::Range<u64>> = vec![];
1147+
let original: Vec<std::ops::Range<u64>> = vec![];
1148+
let result = super::split_ranges_for_concurrency(merged, &original, 4);
11051149
assert!(result.is_empty());
11061150
}
11071151

11081152
#[test]
11091153
fn test_split_clustered_and_sparse() {
1110-
// Simulates clustered + sparse columns:
1111-
// Clustered group merged into 0..400, sparse columns at 1000 and 2000
1112-
let ranges = vec![0..400, 1000..1010, 2000..2010];
1113-
let result = super::split_ranges_for_concurrency(ranges, 4);
1114-
assert_eq!(result.len(), 4);
1115-
// The large range should be split, small ones preserved
1154+
let original = vec![0..100, 150..250, 300..400, 1000..1010, 2000..2010];
1155+
let merged = vec![0..400, 1000..1010, 2000..2010];
1156+
let result = super::split_ranges_for_concurrency(merged, &original, 5);
11161157
assert!(result.contains(&(1000..1010)));
11171158
assert!(result.contains(&(2000..2010)));
1159+
for orig in &original {
1160+
assert!(
1161+
result
1162+
.iter()
1163+
.any(|r| r.start <= orig.start && r.end >= orig.end),
1164+
"original {orig:?} not fully contained"
1165+
);
1166+
}
11181167
}
11191168
}

0 commit comments

Comments
 (0)