Skip to content

Commit ebcbd19

Browse files
author
umi
committed
fix
1 parent 3bc6a4e commit ebcbd19

2 files changed

Lines changed: 102 additions & 24 deletions

File tree

crates/paimon/src/arrow/format/parquet.rs

Lines changed: 94 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -997,26 +997,40 @@ fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
997997
/// with the last chunk taking whatever remains.
998998
/// Ranges smaller than `2 * MIN_SPLIT_SIZE` are kept as-is to
999999
/// avoid excessive small IO requests.
1000-
fn split_ranges_for_concurrency(merged: Vec<Range<u64>>, target_count: usize) -> Vec<Range<u64>> {
1001-
if merged.is_empty() || target_count <= 1 {
1000+
fn split_ranges_for_concurrency(merged: Vec<Range<u64>>, concurrency: usize) -> Vec<Range<u64>> {
1001+
if merged.is_empty() || concurrency <= 1 {
10021002
return merged;
10031003
}
10041004

10051005
let mut result = Vec::with_capacity(merged.len());
10061006

10071007
for range in &merged {
10081008
let length = range.end - range.start;
1009-
let expected_size = MIN_SPLIT_SIZE.max(length / target_count as u64 + 1);
1010-
let min_remain = expected_size.max(MIN_SPLIT_SIZE * 2);
1009+
let raw_size = MIN_SPLIT_SIZE.max(length / concurrency as u64 + 1);
1010+
// Round up to the nearest multiple of MIN_SPLIT_SIZE (4 MB) so that
1011+
// every split boundary is 4 MB-aligned relative to the range start.
1012+
let expected_size = raw_size.div_ceil(MIN_SPLIT_SIZE) * MIN_SPLIT_SIZE;
1013+
let min_tail_size = expected_size.max(MIN_SPLIT_SIZE * 2);
10111014

10121015
let mut offset = range.start;
10131016
let end = range.end;
10141017

1018+
// Align the first split boundary: if `offset` is not 4 MB-aligned,
1019+
// emit a short head chunk so that all subsequent chunks start on a
1020+
// 4 MB boundary.
1021+
let misalign = offset % MIN_SPLIT_SIZE;
1022+
if misalign != 0 {
1023+
let first_end = (offset - misalign + MIN_SPLIT_SIZE).min(end);
1024+
result.push(offset..first_end);
1025+
offset = first_end;
1026+
}
1027+
10151028
loop {
1016-
if offset + min_remain > end {
1017-
if offset < end {
1018-
result.push(offset..end);
1019-
}
1029+
if offset >= end {
1030+
break;
1031+
}
1032+
if end - offset < min_tail_size {
1033+
result.push(offset..end);
10201034
break;
10211035
} else {
10221036
result.push(offset..offset + expected_size);
@@ -1123,28 +1137,86 @@ mod tests {
11231137
// -----------------------------------------------------------------------
11241138

11251139
#[test]
1126-
fn test_split_single_small_range() {
1127-
// A single range smaller than 2 * MIN_SPLIT_SIZE should not be split.
1140+
fn test_split_aligned_range_0_to_20mb() {
1141+
// 0..20MB, concurrency=4:
1142+
// raw_size = max(4MB, 5MB+1) = 5MB+1
1143+
// expected_size = ceil((5MB+1)/4MB)*4MB = 8MB
1144+
// min_tail_size = max(8MB, 8MB) = 8MB
1145+
// No misalign. Chunks: [0..8, 8..16, 16..20]
1146+
let mb = 1024 * 1024u64;
11281147
#[allow(clippy::single_range_in_vec_init)]
1129-
let merged = vec![0..1000];
1148+
let merged = vec![0..20 * mb];
11301149
let result = super::split_ranges_for_concurrency(merged, 4);
1131-
assert_eq!(result.len(), 1);
1132-
assert_eq!(result[0], 0..1000);
1150+
assert_eq!(result, vec![0..8 * mb, 8 * mb..16 * mb, 16 * mb..20 * mb]);
11331151
}
11341152

11351153
#[test]
1136-
fn test_split_large_range_into_batches() {
1154+
fn test_split_unaligned_start_6_to_14mb() {
1155+
// 6MB..14MB, concurrency=4:
1156+
// raw_size = max(4MB, 2MB+1) = 4MB
1157+
// expected_size = 4MB, min_tail_size = 8MB
1158+
// Head: 6..8MB. Loop: 8+8=16 > 14 → tail 8..14.
1159+
// Result: [6..8, 8..14]
11371160
let mb = 1024 * 1024u64;
1138-
let size = 40 * mb;
11391161
#[allow(clippy::single_range_in_vec_init)]
1140-
let merged = vec![0..size];
1162+
let merged = vec![6 * mb..14 * mb];
11411163
let result = super::split_ranges_for_concurrency(merged, 4);
1142-
assert!(result.len() > 1);
1143-
assert_eq!(result.first().unwrap().start, 0);
1144-
assert_eq!(result.last().unwrap().end, size);
1145-
for i in 1..result.len() {
1146-
assert_eq!(result[i].start, result[i - 1].end);
1147-
}
1164+
assert_eq!(result, vec![6 * mb..8 * mb, 8 * mb..14 * mb]);
1165+
}
1166+
1167+
#[test]
1168+
fn test_split_unaligned_start_6_to_22mb() {
1169+
// 6MB..22MB, concurrency=4:
1170+
// raw_size = max(4MB, 4MB+1) = 4MB+1
1171+
// expected_size = ceil((4MB+1)/4MB)*4MB = 8MB
1172+
// min_tail_size = 8MB
1173+
// Head: 6..8MB. Loop: 8+8=16 ≤ 22 → 8..16; 16+8=24 > 22 → tail 16..22.
1174+
// Result: [6..8, 8..16, 16..22]
1175+
let mb = 1024 * 1024u64;
1176+
#[allow(clippy::single_range_in_vec_init)]
1177+
let merged = vec![6 * mb..22 * mb];
1178+
let result = super::split_ranges_for_concurrency(merged, 4);
1179+
assert_eq!(
1180+
result,
1181+
vec![6 * mb..8 * mb, 8 * mb..16 * mb, 16 * mb..22 * mb]
1182+
);
1183+
}
1184+
1185+
#[test]
1186+
fn test_split_already_aligned_8_to_24mb() {
1187+
// 8MB..24MB, concurrency=4:
1188+
// raw_size = max(4MB, 4MB+1) = 4MB+1
1189+
// expected_size = 8MB, min_tail_size = 8MB
1190+
// No misalign. Loop: 8+8=16 ≤ 24 → 8..16; 16+8=24 ≤ 24 → 16..24; offset=24 >= end → break.
1191+
// Result: [8..16, 16..24]
1192+
let mb = 1024 * 1024u64;
1193+
#[allow(clippy::single_range_in_vec_init)]
1194+
let merged = vec![8 * mb..24 * mb];
1195+
let result = super::split_ranges_for_concurrency(merged, 4);
1196+
assert_eq!(result, vec![8 * mb..16 * mb, 16 * mb..24 * mb]);
1197+
}
1198+
1199+
#[test]
1200+
fn test_split_multiple_ranges() {
1201+
// [0..20MB, 24..44MB], concurrency=4:
1202+
// Range 0..20MB → [0..8, 8..16, 16..20] (same as test above)
1203+
// Range 24..44MB (20MB): expected_size=8MB, min_tail_size=8MB, no misalign.
1204+
// 24+8=32 ≤ 44 → 24..32; 32+8=40 ≤ 44 → 32..40; 40+8=48 > 44 → tail 40..44.
1205+
// Result: [0..8, 8..16, 16..20, 24..32, 32..40, 40..44]
1206+
let mb = 1024 * 1024u64;
1207+
let merged = vec![0..20 * mb, 24 * mb..44 * mb];
1208+
let result = super::split_ranges_for_concurrency(merged, 4);
1209+
assert_eq!(
1210+
result,
1211+
vec![
1212+
0..8 * mb,
1213+
8 * mb..16 * mb,
1214+
16 * mb..20 * mb,
1215+
24 * mb..32 * mb,
1216+
32 * mb..40 * mb,
1217+
40 * mb..44 * mb,
1218+
]
1219+
);
11481220
}
11491221

11501222
#[test]

crates/paimon/src/catalog/rest/rest_token_file_io.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,14 @@ impl RESTTokenFileIO {
9393
match token_guard.as_ref() {
9494
Some(token) => {
9595
// Merge catalog options (base) with token credentials (override)
96-
let merged_props =
97-
RESTUtil::merge(Some(self.catalog_options.to_map()), Some(&token.token));
96+
// token.token["fs.oss.endpoint"] = oss-cn-hangzhou.aliyuncs.com
97+
let mut token_with_endpoint = token.token.clone();
98+
token_with_endpoint.insert(
99+
"fs.oss.endpoint".to_string(),
100+
"oss-cn-hangzhou.aliyuncs.com".to_string(),
101+
);
102+
let base = self.catalog_options.to_map().clone();
103+
let merged_props = RESTUtil::merge(Some(&base), Some(&token_with_endpoint));
98104
// Build FileIO with merged properties
99105
let mut builder = FileIO::from_path(&self.path)?;
100106
builder = builder.with_props(merged_props);

0 commit comments

Comments
 (0)