Skip to content

Commit 317f7d7

Browse files
author
umi
committed
averPara
1 parent ba47ac9 commit 317f7d7

2 files changed

Lines changed: 132 additions & 70 deletions

File tree

crates/paimon/src/arrow/format/parquet.rs

Lines changed: 130 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -772,10 +772,14 @@ fn build_row_ranges_selection(
772772

773773
/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
774774
///
775-
/// Supports range coalescing to reduce the number of object-store round-trips
776-
/// when reading column chunks from remote storage.
775+
/// # TODO
777776
///
778-
/// Inspired by iceberg-rust's `ArrowFileReader` (PR #2181).
777+
/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
778+
/// contains the following hints to speed up metadata loading, similar to iceberg, we can consider adding them to this struct:
779+
///
780+
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer.
781+
/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`].
782+
/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`].
779783
struct ArrowFileReader {
780784
file_size: u64,
781785
r: Box<dyn FileRead>,
@@ -794,7 +798,7 @@ struct ArrowFileReader {
794798
const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
795799
/// Default concurrent range fetches.
796800
const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 8;
797-
/// Default metadata prefetch hint: 512 KiB (same as DataFusion's default).
801+
/// Default metadata prefetch hint: 512 KiB.
798802
const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024;
799803

800804
impl ArrowFileReader {
@@ -828,8 +832,8 @@ impl AsyncFileReader for ArrowFileReader {
828832
}
829833

830834
fn get_byte_ranges(
831-
&mut self,
832-
ranges: Vec<Range<u64>>,
835+
&mut self,
836+
ranges: Vec<Range<u64>>,
833837
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
834838
let coalesce_bytes = self.range_coalesce_bytes;
835839
let concurrency = self.range_fetch_concurrency.max(1);
@@ -839,26 +843,19 @@ impl AsyncFileReader for ArrowFileReader {
839843
return Ok(vec![]);
840844
}
841845

842-
// Calculate max merged range size to ensure enough ranges for concurrency.
843-
// For column-pruned reads, ranges are naturally spread out so this has no effect.
844-
// For full-table reads, this prevents everything from merging into 1 huge range.
845-
let total_bytes: u64 = ranges.iter().map(|r| r.end - r.start).sum();
846-
let max_merge_bytes = if concurrency > 1 {
847-
(total_bytes / concurrency as u64).max(1)
848-
} else {
849-
u64::MAX
850-
};
851-
852-
let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes, max_merge_bytes);
846+
// Two-phase range optimization:
847+
// Phase 1: Merge nearby ranges based on coalesce threshold.
848+
let coalesced = merge_byte_ranges(&ranges, coalesce_bytes);
849+
// Phase 2: Split large merged ranges to utilize concurrency.
850+
let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency);
853851

854852
// Fetch merged ranges concurrently.
855853
let r = &self.r;
856854
let fetched: Vec<Bytes> = if fetch_ranges.len() <= concurrency {
857855
// All ranges fit within the concurrency limit — fire them all at once.
858856
futures::future::try_join_all(fetch_ranges.iter().map(|range| {
859-
r.read(range.clone()).map_err(|e| {
860-
parquet::errors::ParquetError::External(format!("{e}").into())
861-
})
857+
r.read(range.clone())
858+
.map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into()))
862859
}))
863860
.await?
864861
} else {
@@ -912,44 +909,74 @@ impl AsyncFileReader for ArrowFileReader {
912909
// Range coalescing
913910
// ---------------------------------------------------------------------------
914911

915-
/// Merge nearby byte ranges to reduce the number of object-store requests.
912+
/// Merge nearby byte ranges to reduce the number of requests.
916913
///
917914
/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range.
918915
/// The input does not need to be sorted.
919-
fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64, max_merge_bytes: u64) -> Vec<Range<u64>> {
920-
if ranges.is_empty() {
921-
return vec![];
922-
}
923-
924-
let mut sorted = ranges.to_vec();
925-
sorted.sort_unstable_by_key(|r| r.start);
926-
927-
let mut merged = Vec::with_capacity(sorted.len());
928-
let mut start_idx = 0;
929-
let mut end_idx = 1;
930-
931-
while start_idx != sorted.len() {
932-
let mut range_end = sorted[start_idx].end;
933-
934-
while end_idx != sorted.len()
935-
&& sorted[end_idx]
936-
.start
937-
.checked_sub(range_end)
938-
.map(|delta| delta <= coalesce)
939-
.unwrap_or(true)
940-
&& (sorted[end_idx].end - sorted[start_idx].start) <= max_merge_bytes
941-
{
942-
range_end = range_end.max(sorted[end_idx].end);
943-
end_idx += 1;
944-
}
945-
946-
merged.push(sorted[start_idx].start..range_end);
947-
start_idx = end_idx;
948-
end_idx += 1;
949-
}
950-
951-
merged
952-
}
916+
fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
917+
if ranges.is_empty() {
918+
return vec![];
919+
}
920+
921+
let mut sorted = ranges.to_vec();
922+
sorted.sort_unstable_by_key(|r| r.start);
923+
924+
let mut merged = Vec::with_capacity(sorted.len());
925+
let mut start_idx = 0;
926+
let mut end_idx = 1;
927+
928+
while start_idx != sorted.len() {
929+
let mut range_end = sorted[start_idx].end;
930+
931+
while end_idx != sorted.len()
932+
&& sorted[end_idx]
933+
.start
934+
.checked_sub(range_end)
935+
.map(|delta| delta <= coalesce)
936+
.unwrap_or(true)
937+
{
938+
range_end = range_end.max(sorted[end_idx].end);
939+
end_idx += 1;
940+
}
941+
942+
merged.push(sorted[start_idx].start..range_end);
943+
start_idx = end_idx;
944+
end_idx += 1;
945+
}
946+
947+
merged
948+
}
949+
950+
fn split_ranges_for_concurrency(ranges: Vec<Range<u64>>, target_count: usize) -> Vec<Range<u64>> {
951+
if ranges.is_empty() || target_count <= 1 || ranges.len() >= target_count {
952+
return ranges;
953+
}
954+
955+
let mut result = ranges;
956+
957+
while result.len() < target_count {
958+
// Find the largest range by byte size.
959+
let (largest_idx, largest_range) = result
960+
.iter()
961+
.enumerate()
962+
.max_by_key(|(_, r)| r.end - r.start)
963+
.expect("result is non-empty");
964+
965+
let range_size = largest_range.end - largest_range.start;
966+
if range_size <= 1 {
967+
break;
968+
}
969+
970+
let mid = largest_range.start + range_size / 2;
971+
let left = largest_range.start..mid;
972+
let right = mid..largest_range.end;
973+
974+
result[largest_idx] = left;
975+
result.insert(largest_idx + 1, right);
976+
}
977+
978+
result
979+
}
953980

954981
// ---------------------------------------------------------------------------
955982
// Tests
@@ -1047,13 +1074,6 @@ mod tests {
10471074
assert_eq!(merged, vec![0..600]);
10481075
}
10491076

1050-
#[test]
1051-
fn test_merge_byte_ranges_single() {
1052-
let ranges = vec![100..200];
1053-
let merged = super::merge_byte_ranges(&ranges, 1024);
1054-
assert_eq!(merged, vec![100..200]);
1055-
}
1056-
10571077
#[test]
10581078
fn test_merge_byte_ranges_zero_coalesce_adjacent() {
10591079
// With coalesce=0, adjacent ranges (gap=0) should still merge
@@ -1069,4 +1089,52 @@ mod tests {
10691089
let merged = super::merge_byte_ranges(&ranges, 0);
10701090
assert_eq!(merged, vec![0..100, 101..200]);
10711091
}
1092+
1093+
// -----------------------------------------------------------------------
1094+
// split_ranges_for_concurrency tests
1095+
// -----------------------------------------------------------------------
1096+
1097+
#[test]
1098+
fn test_split_single_range() {
1099+
// One large range split into 4
1100+
let ranges = vec![0..1000];
1101+
let result = super::split_ranges_for_concurrency(ranges, 4);
1102+
assert_eq!(result.len(), 4);
1103+
// All ranges should be contiguous and cover 0..1000
1104+
assert_eq!(result[0].start, 0);
1105+
assert_eq!(result.last().unwrap().end, 1000);
1106+
for window in result.windows(2) {
1107+
assert_eq!(window[0].end, window[1].start);
1108+
}
1109+
}
1110+
1111+
#[test]
1112+
fn test_split_mixed_sizes() {
1113+
// One large range + one small range, target=4
1114+
// Should split the large range, leave the small one alone
1115+
let ranges = vec![0..1000, 2000..2010];
1116+
let result = super::split_ranges_for_concurrency(ranges, 4);
1117+
assert_eq!(result.len(), 4);
1118+
// The small range (2000..2010) should remain intact
1119+
assert!(result.contains(&(2000..2010)));
1120+
}
1121+
1122+
#[test]
1123+
fn test_split_empty() {
1124+
let ranges: Vec<std::ops::Range<u64>> = vec![];
1125+
let result = super::split_ranges_for_concurrency(ranges, 4);
1126+
assert!(result.is_empty());
1127+
}
1128+
1129+
#[test]
1130+
fn test_split_clustered_and_sparse() {
1131+
// Simulates clustered + sparse columns:
1132+
// Clustered group merged into 0..400, sparse columns at 1000 and 2000
1133+
let ranges = vec![0..400, 1000..1010, 2000..2010];
1134+
let result = super::split_ranges_for_concurrency(ranges, 4);
1135+
assert_eq!(result.len(), 4);
1136+
// The large range should be split, small ones preserved
1137+
assert!(result.contains(&(1000..1010)));
1138+
assert!(result.contains(&(2000..2010)));
1139+
}
10721140
}

crates/paimon/src/catalog/rest/rest_token_file_io.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,8 @@ impl RESTTokenFileIO {
9393
match token_guard.as_ref() {
9494
Some(token) => {
9595
// Merge catalog options (base) with token credentials (override)
96-
// token.token["fs.oss.endpoint"] = oss-cn-hangzhou.aliyuncs.com
97-
let mut token_with_endpoint = token.token.clone();
98-
token_with_endpoint.insert(
99-
"fs.oss.endpoint".to_string(),
100-
"oss-cn-hangzhou.aliyuncs.com".to_string(),
101-
);
102-
let base = self.catalog_options.to_map().clone();
103-
let merged_props = RESTUtil::merge(Some(&base), Some(&token_with_endpoint));
96+
let merged_props =
97+
RESTUtil::merge(Some(self.catalog_options.to_map()), Some(&token.token));
10498
// Build FileIO with merged properties
10599
let mut builder = FileIO::from_path(&self.path)?;
106100
builder = builder.with_props(merged_props);

0 commit comments

Comments
 (0)