Skip to content

Commit 9478259

Browse files
authored
feat: introduce time travel for data fusion (#195)
* feat: introduce time travel for data fusion
1 parent 1154bdb commit 9478259

10 files changed

Lines changed: 483 additions & 38 deletions

File tree

crates/integrations/datafusion/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ keywords = ["paimon", "datafusion", "integrations"]
2626

2727
[dependencies]
2828
async-trait = "0.1"
29+
chrono = "0.4"
2930
datafusion = { version = "52.3.0"}
3031
paimon = { path = "../../paimon" }
3132
futures = "0.3"
3233

3334
[dev-dependencies]
35+
serde_json = "1"
3436
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

crates/integrations/datafusion/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ mod catalog;
4040
mod error;
4141
mod filter_pushdown;
4242
mod physical_plan;
43+
mod relation_planner;
4344
mod schema;
4445
mod table;
4546

4647
pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
4748
pub use error::to_datafusion_error;
4849
pub use physical_plan::PaimonTableScan;
50+
pub use relation_planner::PaimonRelationPlanner;
4951
pub use schema::paimon_schema_to_arrow;
5052
pub use table::PaimonTableProvider;
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Custom [`RelationPlanner`] for Paimon time travel via `FOR SYSTEM_TIME AS OF`.
19+
20+
use std::collections::HashMap;
21+
use std::fmt::Debug;
22+
use std::sync::Arc;
23+
24+
use datafusion::catalog::default_table_source::{provider_as_source, source_as_provider};
25+
use datafusion::common::TableReference;
26+
use datafusion::error::Result as DFResult;
27+
use datafusion::logical_expr::builder::LogicalPlanBuilder;
28+
use datafusion::logical_expr::planner::{
29+
PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
30+
};
31+
use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
32+
use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION};
33+
34+
use crate::table::PaimonTableProvider;
35+
36+
/// A [`RelationPlanner`] that intercepts `FOR SYSTEM_TIME AS OF` clauses
37+
/// on Paimon tables and resolves them to time travel options.
38+
///
39+
/// - Integer literal → sets `scan.snapshot-id` option on the table.
40+
/// - String literal → parsed as a timestamp, sets `scan.timestamp-millis` option.
41+
#[derive(Debug)]
42+
pub struct PaimonRelationPlanner;
43+
44+
impl PaimonRelationPlanner {
45+
pub fn new() -> Self {
46+
Self
47+
}
48+
}
49+
50+
impl Default for PaimonRelationPlanner {
51+
fn default() -> Self {
52+
Self::new()
53+
}
54+
}
55+
56+
impl RelationPlanner for PaimonRelationPlanner {
57+
fn plan_relation(
58+
&self,
59+
relation: TableFactor,
60+
context: &mut dyn RelationPlannerContext,
61+
) -> DFResult<RelationPlanning> {
62+
// Only handle Table factors with a version clause.
63+
let TableFactor::Table {
64+
ref name,
65+
ref version,
66+
..
67+
} = relation
68+
else {
69+
return Ok(RelationPlanning::Original(relation));
70+
};
71+
72+
let version_expr = match version {
73+
Some(TableVersion::ForSystemTimeAsOf(expr)) => expr.clone(),
74+
_ => return Ok(RelationPlanning::Original(relation)),
75+
};
76+
77+
// Resolve the table reference.
78+
let table_ref = object_name_to_table_reference(name, context)?;
79+
let source = context
80+
.context_provider()
81+
.get_table_source(table_ref.clone())?;
82+
let provider = source_as_provider(&source)?;
83+
84+
// Check if this is a Paimon table.
85+
let Some(paimon_provider) = provider.as_any().downcast_ref::<PaimonTableProvider>() else {
86+
return Ok(RelationPlanning::Original(relation));
87+
};
88+
89+
let extra_options = resolve_time_travel_options(&version_expr)?;
90+
let new_table = paimon_provider.table().copy_with_options(extra_options);
91+
let new_provider = PaimonTableProvider::try_new(new_table)?;
92+
let new_source = provider_as_source(Arc::new(new_provider));
93+
94+
// Destructure to get alias.
95+
let TableFactor::Table { alias, .. } = relation else {
96+
unreachable!()
97+
};
98+
99+
let plan = LogicalPlanBuilder::scan(table_ref, new_source, None)?.build()?;
100+
Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
101+
}
102+
}
103+
104+
/// Convert a sqlparser `ObjectName` to a DataFusion `TableReference`.
105+
fn object_name_to_table_reference(
106+
name: &ast::ObjectName,
107+
context: &mut dyn RelationPlannerContext,
108+
) -> DFResult<TableReference> {
109+
let idents: Vec<String> = name
110+
.0
111+
.iter()
112+
.map(|part| {
113+
let ident = part.as_ident().ok_or_else(|| {
114+
datafusion::error::DataFusionError::Plan(format!(
115+
"Expected simple identifier in table reference, got: {part}"
116+
))
117+
})?;
118+
Ok(context.normalize_ident(ident.clone()))
119+
})
120+
.collect::<DFResult<_>>()?;
121+
match idents.len() {
122+
1 => Ok(TableReference::bare(idents[0].clone())),
123+
2 => Ok(TableReference::partial(
124+
idents[0].clone(),
125+
idents[1].clone(),
126+
)),
127+
3 => Ok(TableReference::full(
128+
idents[0].clone(),
129+
idents[1].clone(),
130+
idents[2].clone(),
131+
)),
132+
_ => Err(datafusion::error::DataFusionError::Plan(format!(
133+
"Unsupported table reference: {name}"
134+
))),
135+
}
136+
}
137+
138+
/// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
139+
///
140+
/// - Integer literal → `{"scan.snapshot-id": "N"}`
141+
/// - String literal (timestamp) → parse to millis → `{"scan.timestamp-millis": "M"}`
142+
fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
143+
match expr {
144+
ast::Expr::Value(v) => match &v.value {
145+
ast::Value::Number(n, _) => {
146+
// Validate it's a valid integer
147+
n.parse::<i64>().map_err(|e| {
148+
datafusion::error::DataFusionError::Plan(format!(
149+
"Invalid snapshot id '{n}': {e}"
150+
))
151+
})?;
152+
Ok(HashMap::from([(
153+
SCAN_SNAPSHOT_ID_OPTION.to_string(),
154+
n.clone(),
155+
)]))
156+
}
157+
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
158+
let timestamp_millis = parse_timestamp_to_millis(s)?;
159+
Ok(HashMap::from([(
160+
SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
161+
timestamp_millis.to_string(),
162+
)]))
163+
}
164+
_ => Err(datafusion::error::DataFusionError::Plan(format!(
165+
"Unsupported time travel expression: {expr}"
166+
))),
167+
},
168+
_ => Err(datafusion::error::DataFusionError::Plan(format!(
169+
"Unsupported time travel expression: {expr}. Expected an integer snapshot id or a timestamp string."
170+
))),
171+
}
172+
}
173+
174+
/// Parse a timestamp string to milliseconds since epoch (using local timezone).
175+
///
176+
/// Matches Java Paimon's behavior which uses `TimeZone.getDefault()`.
177+
fn parse_timestamp_to_millis(ts: &str) -> DFResult<i64> {
178+
use chrono::{Local, NaiveDateTime, TimeZone};
179+
180+
let naive = NaiveDateTime::parse_from_str(ts, "%Y-%m-%d %H:%M:%S").map_err(|e| {
181+
datafusion::error::DataFusionError::Plan(format!(
182+
"Cannot parse time travel timestamp '{ts}': {e}. Expected format: YYYY-MM-DD HH:MM:SS"
183+
))
184+
})?;
185+
let local = Local.from_local_datetime(&naive).single().ok_or_else(|| {
186+
datafusion::error::DataFusionError::Plan(format!("Ambiguous or invalid local time: '{ts}'"))
187+
})?;
188+
Ok(local.timestamp_millis())
189+
}

crates/integrations/datafusion/tests/read_tables.rs

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion::logical_expr::{col, lit, TableProviderFilterPushDown};
2424
use datafusion::prelude::{SessionConfig, SessionContext};
2525
use paimon::catalog::Identifier;
2626
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
27-
use paimon_datafusion::{PaimonCatalogProvider, PaimonTableProvider};
27+
use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, PaimonTableProvider};
2828

2929
fn get_test_warehouse() -> String {
3030
std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string())
@@ -363,3 +363,64 @@ async fn test_missing_database_returns_no_schema() {
363363
"missing databases should not resolve to a schema provider"
364364
);
365365
}
366+
367+
// ======================= Time Travel Tests =======================
368+
369+
/// Helper: create a SessionContext with catalog + relation planner for time travel.
370+
/// Uses BigQuery dialect to enable `FOR SYSTEM_TIME AS OF` syntax.
371+
async fn create_time_travel_context() -> SessionContext {
372+
let catalog = create_catalog();
373+
let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", "BigQuery");
374+
let ctx = SessionContext::new_with_config(config);
375+
ctx.register_catalog(
376+
"paimon",
377+
Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))),
378+
);
379+
ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))
380+
.expect("Failed to register relation planner");
381+
ctx
382+
}
383+
384+
#[tokio::test]
385+
async fn test_time_travel_by_snapshot_id() {
386+
let ctx = create_time_travel_context().await;
387+
388+
// Snapshot 1: should contain only the first insert (alice, bob)
389+
let batches = ctx
390+
.sql("SELECT id, name FROM paimon.default.time_travel_table FOR SYSTEM_TIME AS OF 1")
391+
.await
392+
.expect("time travel query should parse")
393+
.collect()
394+
.await
395+
.expect("time travel query should execute");
396+
397+
let mut rows = extract_id_name_rows(&batches);
398+
rows.sort_by_key(|(id, _)| *id);
399+
assert_eq!(
400+
rows,
401+
vec![(1, "alice".to_string()), (2, "bob".to_string())],
402+
"Snapshot 1 should contain only the first batch of rows"
403+
);
404+
405+
// Snapshot 2 (latest): should contain all rows
406+
let batches = ctx
407+
.sql("SELECT id, name FROM paimon.default.time_travel_table FOR SYSTEM_TIME AS OF 2")
408+
.await
409+
.expect("time travel query should parse")
410+
.collect()
411+
.await
412+
.expect("time travel query should execute");
413+
414+
let mut rows = extract_id_name_rows(&batches);
415+
rows.sort_by_key(|(id, _)| *id);
416+
assert_eq!(
417+
rows,
418+
vec![
419+
(1, "alice".to_string()),
420+
(2, "bob".to_string()),
421+
(3, "carol".to_string()),
422+
(4, "dave".to_string()),
423+
],
424+
"Snapshot 2 should contain all rows"
425+
);
426+
}

crates/paimon/src/spec/core_options.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
2323
const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
2424
const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
2525
const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
26+
pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
27+
pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
2628
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
2729
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
2830
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
@@ -88,6 +90,20 @@ impl<'a> CoreOptions<'a> {
8890
.map(|v| v.eq_ignore_ascii_case("true"))
8991
.unwrap_or(true)
9092
}
93+
94+
/// Snapshot id for time travel via `scan.snapshot-id`.
95+
pub fn scan_snapshot_id(&self) -> Option<i64> {
96+
self.options
97+
.get(SCAN_SNAPSHOT_ID_OPTION)
98+
.and_then(|v| v.parse().ok())
99+
}
100+
101+
/// Timestamp in millis for time travel via `scan.timestamp-millis`.
102+
pub fn scan_timestamp_millis(&self) -> Option<i64> {
103+
self.options
104+
.get(SCAN_TIMESTAMP_MILLIS_OPTION)
105+
.and_then(|v| v.parse().ok())
106+
}
91107
}
92108

93109
/// Parse a memory size string to bytes using binary (1024-based) semantics.

crates/paimon/src/spec/schema.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ impl TableSchema {
9494
&self.options
9595
}
9696

97+
/// Create a copy of this schema with extra options merged in.
98+
pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
99+
let mut new_schema = self.clone();
100+
new_schema.options.extend(extra);
101+
new_schema
102+
}
103+
97104
pub fn comment(&self) -> Option<&str> {
98105
self.comment.as_deref()
99106
}

crates/paimon/src/table/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub use table_scan::TableScan;
3434
use crate::catalog::Identifier;
3535
use crate::io::FileIO;
3636
use crate::spec::TableSchema;
37+
use std::collections::HashMap;
3738

3839
/// Table represents a table in the catalog.
3940
#[derive(Debug, Clone)]
@@ -87,6 +88,16 @@ impl Table {
8788
pub fn new_read_builder(&self) -> ReadBuilder<'_> {
8889
ReadBuilder::new(self)
8990
}
91+
92+
/// Create a copy of this table with extra options merged into the schema.
93+
pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
94+
Self {
95+
file_io: self.file_io.clone(),
96+
identifier: self.identifier.clone(),
97+
location: self.location.clone(),
98+
schema: self.schema.copy_with_options(extra),
99+
}
100+
}
90101
}
91102

92103
/// A stream of arrow [`RecordBatch`]es.

0 commit comments

Comments
 (0)