Skip to content

Commit 587389b

Browse files
authored
feat: replace apache-avro with serde_avro_fast and parallelize manifest reads (#203)
Switch Avro deserialization from apache-avro (Value intermediate repr) to serde_avro_fast (direct bytes→struct), eliminating redundant allocations for ~10-20x deserialization speedup. Read manifest files concurrently with buffer_unordered(64) instead of sequentially.
1 parent 35b6386 commit 587389b

11 files changed

Lines changed: 95 additions & 91 deletions

File tree

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,5 @@ rust-version = "1.86.0"
3131
arrow-array = { version = "57.0", features = ["ffi"] }
3232
arrow-schema = "57.0"
3333
arrow-cast = "57.0"
34-
arrow-select = "57.0"
3534
parquet = "57.0"
3635
tokio = "1.39.2"

crates/paimon/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,12 @@ snafu = "0.9.0"
5151
typed-builder = "^0.19"
5252
opendal = { version = "0.55", features = ["services-fs"] }
5353
pretty_assertions = "1"
54-
apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
54+
serde_avro_fast = { version = "2.0.2", features = ["snappy", "zstandard"] }
5555
indexmap = "2.5.0"
5656
roaring = "0.11"
5757
arrow-array = { workspace = true }
5858
arrow-cast = { workspace = true }
5959
arrow-schema = { workspace = true }
60-
arrow-select = { workspace = true }
6160
futures = "0.3"
6261
parquet = { workspace = true, features = ["async", "zstd"] }
6362
async-stream = "0.3.6"
@@ -76,5 +75,5 @@ urlencoding = "2.1"
7675
[dev-dependencies]
7776
axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] }
7877
rand = "0.8.5"
79-
serde_avro_fast = { version = "2.0.2", features = ["snappy"] }
78+
8079
tempfile = "3"

crates/paimon/src/error.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,6 @@ pub enum Error {
6868
display("Paimon hitting invalid config: {}", message)
6969
)]
7070
ConfigInvalid { message: String },
71-
#[snafu(
72-
visibility(pub(crate)),
73-
display("Paimon hitting unexpected avro error {}: {:?}", message, source)
74-
)]
75-
DataUnexpected {
76-
message: String,
77-
source: Box<apache_avro::Error>,
78-
},
7971
#[snafu(
8072
visibility(pub(crate)),
8173
display("Paimon hitting invalid file index format: {}", message)
@@ -127,15 +119,6 @@ impl From<opendal::Error> for Error {
127119
}
128120
}
129121

130-
impl From<apache_avro::Error> for Error {
131-
fn from(source: apache_avro::Error) -> Self {
132-
Error::DataUnexpected {
133-
message: "".to_string(),
134-
source: Box::new(source),
135-
}
136-
}
137-
}
138-
139122
impl From<parquet::errors::ParquetError> for Error {
140123
fn from(source: parquet::errors::ParquetError) -> Self {
141124
Error::ParquetDataUnexpected {

crates/paimon/src/spec/data_file.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,14 @@ pub struct DataFileMeta {
412412
skip_serializing_if = "Option::is_none"
413413
)]
414414
pub write_cols: Option<Vec<String>>,
415+
416+
/// External path for the data file (e.g. when data is stored outside the table directory).
417+
#[serde(
418+
rename = "_EXTERNAL_PATH",
419+
default,
420+
skip_serializing_if = "Option::is_none"
421+
)]
422+
pub external_path: Option<String>,
415423
}
416424

417425
impl Display for DataFileMeta {

crates/paimon/src/spec/index_manifest.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@
1818
use crate::io::FileIO;
1919
use crate::spec::manifest_common::FileKind;
2020
use crate::spec::IndexFileMeta;
21-
use apache_avro::types::Value;
22-
use apache_avro::{from_value, Reader};
2321
use serde::{Deserialize, Serialize};
22+
use serde_avro_fast::object_container_file_encoding::Reader;
23+
use snafu::ResultExt;
2424
use std::fmt::{Display, Formatter};
2525

26-
use crate::Error;
2726
use crate::Result;
2827

2928
/// Manifest entry for index file.
@@ -76,12 +75,12 @@ impl IndexManifest {
7675

7776
/// Read index manifest entries from Avro-encoded bytes.
7877
pub fn read_from_bytes(bytes: &[u8]) -> Result<Vec<IndexManifestEntry>> {
79-
let reader = Reader::new(bytes).map_err(Error::from)?;
80-
let records = reader
81-
.collect::<std::result::Result<Vec<Value>, _>>()
82-
.map_err(Error::from)?;
83-
let values = Value::Array(records);
84-
from_value::<Vec<IndexManifestEntry>>(&values).map_err(Error::from)
78+
let mut reader = Reader::from_slice(bytes)
79+
.whatever_context::<_, crate::Error>("read index manifest avro")?;
80+
reader
81+
.deserialize::<IndexManifestEntry>()
82+
.collect::<std::result::Result<Vec<_>, _>>()
83+
.whatever_context::<_, crate::Error>("deserialize index manifest entry")
8584
}
8685
}
8786

crates/paimon/src/spec/manifest.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
use crate::io::FileIO;
1919
use crate::spec::manifest_entry::ManifestEntry;
20-
use apache_avro::types::Value;
21-
use apache_avro::{from_value, Reader};
20+
use serde_avro_fast::object_container_file_encoding::Reader;
21+
use snafu::ResultExt;
2222

23-
use crate::Error;
2423
use crate::Result;
2524

2625
/// Manifest file reader and writer.
@@ -60,12 +59,12 @@ impl Manifest {
6059
/// # Returns
6160
/// A vector of ManifestEntry records
6261
fn read_from_bytes(bytes: &[u8]) -> Result<Vec<ManifestEntry>> {
63-
let reader = Reader::new(bytes).map_err(Error::from)?;
64-
let records = reader
65-
.collect::<std::result::Result<Vec<Value>, _>>()
66-
.map_err(Error::from)?;
67-
let values = Value::Array(records);
68-
from_value::<Vec<ManifestEntry>>(&values).map_err(Error::from)
62+
let mut reader =
63+
Reader::from_slice(bytes).whatever_context::<_, crate::Error>("read manifest avro")?;
64+
reader
65+
.deserialize::<ManifestEntry>()
66+
.collect::<std::result::Result<Vec<_>, _>>()
67+
.whatever_context::<_, crate::Error>("deserialize manifest entry")
6968
}
7069
}
7170

crates/paimon/src/spec/manifest_entry.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ pub struct Identifier {
2828
pub bucket: i32,
2929
pub level: i32,
3030
pub file_name: String,
31+
pub extra_files: Vec<String>,
32+
pub embedded_index: Option<Vec<u8>>,
33+
pub external_path: Option<String>,
3134
}
3235

3336
/// Entry of a manifest file, representing an addition / deletion of a data file.
@@ -90,6 +93,9 @@ impl ManifestEntry {
9093
bucket: self.bucket,
9194
level: self.file.level,
9295
file_name: self.file.file_name.clone(),
96+
extra_files: self.file.extra_files.clone(),
97+
embedded_index: self.file.embedded_index.clone(),
98+
external_path: self.file.external_path.clone(),
9399
}
94100
}
95101

crates/paimon/src/spec/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub use manifest::Manifest;
4747
mod manifest_common;
4848
pub use manifest_common::FileKind;
4949
mod manifest_entry;
50+
pub use manifest_entry::Identifier;
5051
pub use manifest_entry::ManifestEntry;
5152
mod objects_file;
5253
pub use objects_file::from_avro_bytes;

crates/paimon/src/spec/objects_file.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,18 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::Error;
19-
use apache_avro::types::Value;
20-
use apache_avro::{from_value, Reader};
2118
use serde::de::DeserializeOwned;
19+
use serde_avro_fast::object_container_file_encoding::Reader;
20+
use snafu::ResultExt;
2221

2322
#[allow(dead_code)]
2423
pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) -> crate::Result<Vec<T>> {
25-
let reader = Reader::new(bytes).map_err(Error::from)?;
26-
let records = reader
27-
.collect::<Result<Vec<Value>, _>>()
28-
.map_err(Error::from)?;
29-
let values = Value::Array(records);
30-
from_value::<Vec<T>>(&values).map_err(Error::from)
24+
let mut reader = Reader::from_slice(bytes)
25+
.whatever_context::<_, crate::Error>("read avro object container")?;
26+
reader
27+
.deserialize::<T>()
28+
.collect::<std::result::Result<Vec<_>, _>>()
29+
.whatever_context::<_, crate::Error>("deserialize avro records")
3130
}
3231

3332
#[cfg(test)]
@@ -122,6 +121,7 @@ mod tests {
122121
embedded_index: None,
123122
first_row_id: None,
124123
write_cols: None,
124+
external_path: None,
125125
},
126126
2
127127
),
@@ -158,6 +158,7 @@ mod tests {
158158
embedded_index: None,
159159
first_row_id: None,
160160
write_cols: None,
161+
external_path: None,
161162
},
162163
2
163164
),

crates/paimon/src/table/bin_pack.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ mod tests {
9898
embedded_index: None,
9999
first_row_id: None,
100100
write_cols: None,
101+
external_path: None,
101102
}
102103
}
103104

0 commit comments

Comments
 (0)