Skip to content

Commit cc2032f

Browse files
author
umi
committed
fixSplit
1 parent 66b8fa8 commit cc2032f

1 file changed

Lines changed: 18 additions & 9 deletions

File tree

crates/paimon/src/arrow/format/parquet.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,7 @@ impl AsyncFileReader for ArrowFileReader {
837837
let coalesced = merge_byte_ranges(&ranges, coalesce_bytes);
838838
// Phase 2: Split large merged ranges to utilize concurrency,
839839
// but only at original range boundaries.
840-
let fetch_ranges = split_ranges_for_concurrency(coalesced, &ranges, concurrency);
840+
let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency);
841841

842842
// Fetch merged ranges concurrently.
843843
let r = &self.r;
@@ -863,8 +863,7 @@ impl AsyncFileReader for ArrowFileReader {
863863

864864
// Slice the fetched data back into the originally requested
865865
// ranges. A single original range may span multiple fetch
866-
// chunks (the Java `copyMultiBytesToBytes` approach), so we
867-
// copy from as many chunks as needed.
866+
// chunks, so we copy from as many chunks as needed.
868867
let result: parquet::errors::Result<Vec<Bytes>> = ranges
869868
.iter()
870869
.map(|range| {
@@ -1000,7 +999,6 @@ fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
1000999
/// avoid excessive small IO requests.
10011000
fn split_ranges_for_concurrency(
10021001
merged: Vec<Range<u64>>,
1003-
_original: &[Range<u64>],
10041002
target_count: usize,
10051003
) -> Vec<Range<u64>> {
10061004
if merged.is_empty() || target_count <= 1 {
@@ -1138,19 +1136,30 @@ mod tests {
11381136
// A single range smaller than 2 * MIN_SPLIT_SIZE should not be split.
11391137
#[allow(clippy::single_range_in_vec_init)]
11401138
let merged = vec![0..1000];
1141-
#[allow(clippy::single_range_in_vec_init)]
1142-
let original = vec![0..1000];
1143-
let result = super::split_ranges_for_concurrency(merged, &original, 4);
1139+
let result = super::split_ranges_for_concurrency(merged, 4);
11441140
assert_eq!(result.len(), 1);
11451141
assert_eq!(result[0], 0..1000);
11461142
}
11471143

1144+
#[test]
1145+
fn test_split_large_range_into_batches() {
1146+
let mb = 1024 * 1024u64;
1147+
let size = 40 * mb;
1148+
#[allow(clippy::single_range_in_vec_init)]
1149+
let merged = vec![0..size];
1150+
let result = super::split_ranges_for_concurrency(merged, 4);
1151+
assert!(result.len() > 1);
1152+
assert_eq!(result.first().unwrap().start, 0);
1153+
assert_eq!(result.last().unwrap().end, size);
1154+
for i in 1..result.len() {
1155+
assert_eq!(result[i].start, result[i - 1].end);
1156+
}
1157+
}
11481158

11491159
#[test]
11501160
fn test_split_empty() {
11511161
let merged: Vec<std::ops::Range<u64>> = vec![];
1152-
let original: Vec<std::ops::Range<u64>> = vec![];
1153-
let result = super::split_ranges_for_concurrency(merged, &original, 4);
1162+
let result = super::split_ranges_for_concurrency(merged, 4);
11541163
assert!(result.is_empty());
11551164
}
11561165
}

0 commit comments

Comments
 (0)