Skip to content

Commit 38830ef

Browse files
authored
feat(scan): support deletion vector cardinality (#200)
1 parent 9478259 commit 38830ef

6 files changed

Lines changed: 108 additions & 68 deletions

File tree

crates/paimon/src/spec/index_file_meta.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ use std::fmt::{Display, Formatter};
2020

2121
use indexmap::IndexMap;
2222

23+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24+
pub struct DeletionVectorMeta {
25+
pub offset: i32,
26+
pub length: i32,
27+
pub cardinality: Option<i64>,
28+
}
29+
2330
/// Metadata of index file.
2431
///
2532
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java>
@@ -44,7 +51,7 @@ pub struct IndexFileMeta {
4451
rename = "_DELETIONS_VECTORS_RANGES",
4552
alias = "_DELETION_VECTORS_RANGES"
4653
)]
47-
pub deletion_vectors_ranges: Option<IndexMap<String, (i32, i32)>>,
54+
pub deletion_vectors_ranges: Option<IndexMap<String, DeletionVectorMeta>>,
4855
}
4956

5057
impl Display for IndexFileMeta {
@@ -65,42 +72,56 @@ mod map_serde {
6572
use indexmap::IndexMap;
6673
use serde::{Deserialize, Deserializer, Serialize, Serializer};
6774

75+
use super::DeletionVectorMeta;
76+
6877
#[derive(Deserialize, Serialize)]
6978
struct Temp {
7079
f0: String,
7180
f1: i32,
7281
f2: i32,
82+
#[serde(default, rename = "_CARDINALITY")]
83+
cardinality: Option<i64>,
7384
}
7485

7586
pub fn serialize<S>(
76-
date: &Option<IndexMap<String, (i32, i32)>>,
87+
data: &Option<IndexMap<String, DeletionVectorMeta>>,
7788
s: S,
7889
) -> Result<S::Ok, S::Error>
7990
where
8091
S: Serializer,
8192
{
82-
match *date {
93+
match data {
8394
None => s.serialize_none(),
84-
Some(ref d) => s.collect_seq(d.iter().map(|(s, (i1, i2))| Temp {
85-
f0: s.into(),
86-
f1: *i1,
87-
f2: *i2,
95+
Some(d) => s.collect_seq(d.iter().map(|(path, meta)| Temp {
96+
f0: path.clone(),
97+
f1: meta.offset,
98+
f2: meta.length,
99+
cardinality: meta.cardinality,
88100
})),
89101
}
90102
}
91103

92104
#[allow(clippy::type_complexity)]
93105
pub fn deserialize<'de, D>(
94106
deserializer: D,
95-
) -> Result<Option<IndexMap<String, (i32, i32)>>, D::Error>
107+
) -> Result<Option<IndexMap<String, DeletionVectorMeta>>, D::Error>
96108
where
97109
D: Deserializer<'de>,
98110
{
99111
match Option::deserialize(deserializer)? {
100112
None => Ok(None),
101113
Some::<Vec<Temp>>(s) => Ok(Some(
102114
s.into_iter()
103-
.map(|t| (t.f0, (t.f1, t.f2)))
115+
.map(|t| {
116+
(
117+
t.f0,
118+
DeletionVectorMeta {
119+
offset: t.f1,
120+
length: t.f2,
121+
cardinality: t.cardinality,
122+
},
123+
)
124+
})
104125
.collect::<IndexMap<_, _>>(),
105126
)),
106127
}

crates/paimon/src/spec/index_manifest.rs

Lines changed: 32 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -90,54 +90,38 @@ mod tests {
9090
use indexmap::IndexMap;
9191

9292
use super::*;
93+
use crate::spec::DeletionVectorMeta;
9394

9495
#[test]
9596
fn test_read_index_manifest_file() {
9697
let workdir =
9798
std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}"));
9899
let path = workdir
99-
.join("tests/fixtures/manifest/index-manifest-85cc6729-f5af-431a-a1c3-ef45319328fb-0");
100+
.join("tests/fixtures/manifest/index-manifest-7e816ed9-9f3b-4786-9985-8937d4e07b6e-0");
100101
let source = std::fs::read(path.to_str().unwrap()).unwrap();
101-
let mut reader =
102-
serde_avro_fast::object_container_file_encoding::Reader::from_slice(source.as_slice())
103-
.unwrap();
104-
let res: Vec<_> = reader
105-
.deserialize::<IndexManifestEntry>()
106-
.collect::<Result<_, _>>()
107-
.unwrap();
102+
let res = IndexManifest::read_from_bytes(&source).unwrap();
108103
assert_eq!(
109104
res,
110-
vec![
111-
IndexManifestEntry {
112-
version: 1,
113-
kind: FileKind::Add,
114-
partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
115-
bucket: 0,
116-
index_file: IndexFileMeta {
117-
index_type: "HASH".into(),
118-
file_name: "index-a984b43a-c3fb-40b4-ad29-536343c239a6-0".into(),
119-
file_size: 16,
120-
row_count: 4,
121-
deletion_vectors_ranges: None,
122-
}
123-
},
124-
IndexManifestEntry {
125-
version: 1,
126-
kind: FileKind::Add,
127-
partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
128-
bucket: 0,
129-
index_file: IndexFileMeta {
130-
index_type: "DELETION_VECTORS".into(),
131-
file_name: "index-3f0986c5-4398-449b-be82-95f019d7a748-0".into(),
132-
file_size: 33,
133-
row_count: 1,
134-
deletion_vectors_ranges: Some(IndexMap::from([(
135-
"data-9b76122c-6bb5-4952-a946-b5bce29694a1-0.orc".into(),
136-
(1, 24)
137-
)])),
138-
}
105+
vec![IndexManifestEntry {
106+
version: 1,
107+
kind: FileKind::Add,
108+
partition: vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
109+
bucket: 0,
110+
index_file: IndexFileMeta {
111+
index_type: "DELETION_VECTORS".into(),
112+
file_name: "index-4326356b-aad7-4fd8-9d88-2bb6993c8ce9-0".into(),
113+
file_size: 35,
114+
row_count: 1,
115+
deletion_vectors_ranges: Some(IndexMap::from([(
116+
"data-a989fc44-a361-42c2-801f-e50baba95a92-0.parquet".into(),
117+
DeletionVectorMeta {
118+
offset: 1,
119+
length: 26,
120+
cardinality: Some(3),
121+
}
122+
)])),
139123
}
140-
]
124+
}]
141125
);
142126
}
143127

@@ -153,7 +137,14 @@ mod tests {
153137
file_name: "test1".into(),
154138
file_size: 33,
155139
row_count: 1,
156-
deletion_vectors_ranges: Some(IndexMap::from([("test1".into(), (1, 24))])),
140+
deletion_vectors_ranges: Some(IndexMap::from([(
141+
"test1".into(),
142+
DeletionVectorMeta {
143+
offset: 1,
144+
length: 24,
145+
cardinality: Some(7),
146+
},
147+
)])),
157148
},
158149
};
159150

@@ -180,7 +171,8 @@ mod tests {
180171
"fields": [
181172
{"name": "f0", "type": "string"},
182173
{"name": "f1", "type": "int"},
183-
{"name": "f2", "type": "int"}
174+
{"name": "f2", "type": "int"},
175+
{"name": "_CARDINALITY", "type": ["null", "long"], "default": null}
184176
]
185177
}]
186178
}]

crates/paimon/src/table/source.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -168,18 +168,6 @@ impl DataSplit {
168168
format!("{}/{}", base, file.file_name)
169169
}
170170

171-
/// Iterate over each data file in this split, yielding `(path, &DataFileMeta)`.
172-
/// Use this to read each data file one by one (e.g. in ArrowReader).
173-
pub fn data_file_entries(&self) -> impl Iterator<Item = (String, &DataFileMeta)> + '_ {
174-
let base = self.bucket_path.trim_end_matches('/');
175-
// todo: consider partition table
176-
// todo: consider external path
177-
self.data_files.iter().map(move |file| {
178-
let path = format!("{}/{}", base, file.file_name);
179-
(path, file)
180-
})
181-
}
182-
183171
/// Total row count of all data files in this split.
184172
pub fn row_count(&self) -> i64 {
185173
self.data_files.iter().map(|f| f.row_count).sum()

crates/paimon/src/table/table_scan.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,14 @@ fn build_deletion_files_map(
113113
let key = PartitionBucket::new(entry.partition.clone(), entry.bucket);
114114
let dv_path = format!("{}/{}", index_path_prefix, entry.index_file.file_name);
115115
let per_bucket = map.entry(key).or_default();
116-
for (data_file_name, (offset, length)) in ranges {
116+
for (data_file_name, meta) in ranges {
117117
per_bucket.insert(
118118
data_file_name.clone(),
119119
DeletionFile::new(
120120
dv_path.clone(),
121-
*offset as i64,
122-
*length as i64,
123-
// todo: consider cardinality
124-
None,
121+
meta.offset as i64,
122+
meta.length as i64,
123+
meta.cardinality,
125124
),
126125
);
127126
}
@@ -513,9 +512,11 @@ impl<'a> TableScan<'a> {
513512
mod tests {
514513
use super::{group_by_overlapping_row_id, partition_matches_predicate};
515514
use crate::spec::{
516-
stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType, Datum, IntType,
517-
Predicate, PredicateBuilder, PredicateOperator, VarCharType,
515+
stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType, Datum,
516+
DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry, IntType, Predicate,
517+
PredicateBuilder, PredicateOperator, VarCharType,
518518
};
519+
use crate::table::source::DeletionFile;
519520
use crate::Error;
520521
use chrono::{DateTime, Utc};
521522

@@ -718,4 +719,42 @@ mod tests {
718719
// Sorted by descending max_sequence_number: b(3), c(2), a(1)
719720
assert_eq!(file_names(&groups), vec![vec!["b", "c", "a"]]);
720721
}
722+
723+
#[test]
724+
fn test_build_deletion_files_map_preserves_cardinality() {
725+
let entries = vec![IndexManifestEntry {
726+
version: 1,
727+
kind: FileKind::Add,
728+
partition: vec![1, 2, 3],
729+
bucket: 7,
730+
index_file: IndexFileMeta {
731+
index_type: "DELETION_VECTORS".into(),
732+
file_name: "index-file".into(),
733+
file_size: 128,
734+
row_count: 1,
735+
deletion_vectors_ranges: Some(indexmap::IndexMap::from([(
736+
"data-file.parquet".into(),
737+
DeletionVectorMeta {
738+
offset: 11,
739+
length: 22,
740+
cardinality: Some(33),
741+
},
742+
)])),
743+
},
744+
}];
745+
746+
let map = super::build_deletion_files_map(&entries, "file:/tmp/table");
747+
748+
let by_bucket = map
749+
.get(&super::PartitionBucket::new(vec![1, 2, 3], 7))
750+
.expect("partition bucket should exist");
751+
let deletion_file = by_bucket
752+
.get("data-file.parquet")
753+
.expect("deletion file should exist");
754+
755+
assert_eq!(
756+
deletion_file,
757+
&DeletionFile::new("file:/tmp/table/index/index-file".into(), 11, 22, Some(33))
758+
);
759+
}
721760
}
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)