Skip to content

Commit 30d76b9

Browse files
authored
feat: support time travel by tag name and doc it (#199)
1 parent 38830ef commit 30d76b9

11 files changed

Lines changed: 281 additions & 44 deletions

File tree

crates/integrations/datafusion/src/relation_planner.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@ use datafusion::logical_expr::planner::{
2929
PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
3030
};
3131
use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
32-
use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION};
32+
use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TAG_NAME_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION};
3333

3434
use crate::table::PaimonTableProvider;
3535

3636
/// A [`RelationPlanner`] that intercepts `FOR SYSTEM_TIME AS OF` clauses
3737
/// on Paimon tables and resolves them to time travel options.
3838
///
3939
/// - Integer literal → sets `scan.snapshot-id` option on the table.
40-
/// - String literal → parsed as a timestamp, sets `scan.timestamp-millis` option.
40+
/// - String literal (timestamp) → parsed as a timestamp, sets `scan.timestamp-millis` option.
41+
/// - String literal (other) → sets `scan.tag-name` option on the table.
4142
#[derive(Debug)]
4243
pub struct PaimonRelationPlanner;
4344

@@ -138,7 +139,8 @@ fn object_name_to_table_reference(
138139
/// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
139140
///
140141
/// - Integer literal → `{"scan.snapshot-id": "N"}`
141-
/// - String literal (timestamp) → parse to millis → `{"scan.timestamp-millis": "M"}`
142+
/// - String literal (timestamp `YYYY-MM-DD HH:MM:SS`) → `{"scan.timestamp-millis": "M"}`
143+
/// - String literal (other) → `{"scan.tag-name": "S"}`
142144
fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
143145
match expr {
144146
ast::Expr::Value(v) => match &v.value {
@@ -155,18 +157,24 @@ fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String, Str
155157
)]))
156158
}
157159
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
158-
let timestamp_millis = parse_timestamp_to_millis(s)?;
159-
Ok(HashMap::from([(
160-
SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
161-
timestamp_millis.to_string(),
162-
)]))
160+
// Try parsing as timestamp first; fall back to tag name.
161+
match parse_timestamp_to_millis(s) {
162+
Ok(timestamp_millis) => Ok(HashMap::from([(
163+
SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
164+
timestamp_millis.to_string(),
165+
)])),
166+
Err(_) => Ok(HashMap::from([(
167+
SCAN_TAG_NAME_OPTION.to_string(),
168+
s.clone(),
169+
)])),
170+
}
163171
}
164172
_ => Err(datafusion::error::DataFusionError::Plan(format!(
165173
"Unsupported time travel expression: {expr}"
166174
))),
167175
},
168176
_ => Err(datafusion::error::DataFusionError::Plan(format!(
169-
"Unsupported time travel expression: {expr}. Expected an integer snapshot id or a timestamp string."
177+
"Unsupported time travel expression: {expr}. Expected an integer snapshot id, a timestamp string, or a tag name."
170178
))),
171179
}
172180
}

crates/integrations/datafusion/tests/read_tables.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,3 +424,47 @@ async fn test_time_travel_by_snapshot_id() {
424424
"Snapshot 2 should contain all rows"
425425
);
426426
}
427+
428+
#[tokio::test]
429+
async fn test_time_travel_by_tag_name() {
430+
let ctx = create_time_travel_context().await;
431+
432+
// Tag 'snapshot1' points to snapshot 1: should contain only (alice, bob)
433+
let batches = ctx
434+
.sql("SELECT id, name FROM paimon.default.time_travel_table FOR SYSTEM_TIME AS OF 'snapshot1'")
435+
.await
436+
.expect("tag time travel query should parse")
437+
.collect()
438+
.await
439+
.expect("tag time travel query should execute");
440+
441+
let mut rows = extract_id_name_rows(&batches);
442+
rows.sort_by_key(|(id, _)| *id);
443+
assert_eq!(
444+
rows,
445+
vec![(1, "alice".to_string()), (2, "bob".to_string())],
446+
"Tag 'snapshot1' should contain only the first batch of rows"
447+
);
448+
449+
// Tag 'snapshot2' points to snapshot 2: should contain all rows
450+
let batches = ctx
451+
.sql("SELECT id, name FROM paimon.default.time_travel_table FOR SYSTEM_TIME AS OF 'snapshot2'")
452+
.await
453+
.expect("tag time travel query should parse")
454+
.collect()
455+
.await
456+
.expect("tag time travel query should execute");
457+
458+
let mut rows = extract_id_name_rows(&batches);
459+
rows.sort_by_key(|(id, _)| *id);
460+
assert_eq!(
461+
rows,
462+
vec![
463+
(1, "alice".to_string()),
464+
(2, "bob".to_string()),
465+
(3, "carol".to_string()),
466+
(4, "dave".to_string()),
467+
],
468+
"Tag 'snapshot2' should contain all rows"
469+
);
470+
}

crates/paimon/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,5 @@ pub use catalog::FileSystemCatalog;
3939

4040
pub use table::{
4141
DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, SnapshotManager,
42-
Table, TableRead, TableScan,
42+
Table, TableRead, TableScan, TagManager,
4343
};

crates/paimon/src/spec/core_options.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
2525
const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
2626
pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
2727
pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
28+
pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name";
2829
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
2930
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
3031
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
@@ -104,6 +105,11 @@ impl<'a> CoreOptions<'a> {
104105
.get(SCAN_TIMESTAMP_MILLIS_OPTION)
105106
.and_then(|v| v.parse().ok())
106107
}
108+
109+
/// Tag name for time travel via `scan.tag-name`.
110+
pub fn scan_tag_name(&self) -> Option<&str> {
111+
self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str)
112+
}
107113
}
108114

109115
/// Parse a memory size string to bytes using binary (1024-based) semantics.

crates/paimon/src/table/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod read_builder;
2222
mod snapshot_manager;
2323
mod source;
2424
mod table_scan;
25+
mod tag_manager;
2526

2627
use crate::Result;
2728
use arrow_array::RecordBatch;
@@ -30,6 +31,7 @@ pub use read_builder::{ReadBuilder, TableRead};
3031
pub use snapshot_manager::SnapshotManager;
3132
pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan};
3233
pub use table_scan::TableScan;
34+
pub use tag_manager::TagManager;
3335

3436
use crate::catalog::Identifier;
3537
use crate::io::FileIO;

crates/paimon/src/table/table_scan.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::spec::{
2929
use crate::table::bin_pack::split_for_batch;
3030
use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan};
3131
use crate::table::SnapshotManager;
32+
use crate::table::TagManager;
3233
use crate::Error;
3334
use std::collections::{HashMap, HashSet};
3435

@@ -253,7 +254,18 @@ impl<'a> TableScan<'a> {
253254
let snapshot_manager = SnapshotManager::new(file_io.clone(), table_path.to_string());
254255
let core_options = CoreOptions::new(self.table.schema().options());
255256

256-
let snapshot = if let Some(id) = core_options.scan_snapshot_id() {
257+
let snapshot = if let Some(tag_name) = core_options.scan_tag_name() {
258+
let tag_manager = TagManager::new(file_io.clone(), table_path.to_string());
259+
match tag_manager.get(tag_name).await? {
260+
Some(s) => s,
261+
None => {
262+
return Err(Error::DataInvalid {
263+
message: format!("Tag '{tag_name}' doesn't exist."),
264+
source: None,
265+
})
266+
}
267+
}
268+
} else if let Some(id) = core_options.scan_snapshot_id() {
257269
snapshot_manager.get_snapshot(id).await?
258270
} else if let Some(ts) = core_options.scan_timestamp_millis() {
259271
match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Tag manager for reading tag metadata using FileIO.
19+
//!
20+
//! Reference: [org.apache.paimon.utils.TagManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java)
21+
//! and [pypaimon.tag.tag_manager.TagManager](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/tag/tag_manager.py).
22+
23+
use crate::io::FileIO;
24+
use crate::spec::Snapshot;
25+
26+
const TAG_DIR: &str = "tag";
27+
const TAG_PREFIX: &str = "tag-";
28+
29+
/// Manager for tag files using unified FileIO.
30+
///
31+
/// Tags are named snapshots stored as JSON files at `{table_path}/tag/tag-{name}`.
32+
/// The tag file format is identical to a Snapshot JSON file.
33+
///
34+
/// Reference: [org.apache.paimon.utils.TagManager](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java)
35+
#[derive(Debug, Clone)]
36+
pub struct TagManager {
37+
file_io: FileIO,
38+
table_path: String,
39+
}
40+
41+
impl TagManager {
42+
pub fn new(file_io: FileIO, table_path: String) -> Self {
43+
Self {
44+
file_io,
45+
table_path,
46+
}
47+
}
48+
49+
/// Path to the tag directory (e.g. `table_path/tag`).
50+
pub fn tag_directory(&self) -> String {
51+
format!("{}/{}", self.table_path, TAG_DIR)
52+
}
53+
54+
/// Path to the tag file for the given name (e.g. `tag/tag-my_tag`).
55+
pub fn tag_path(&self, tag_name: &str) -> String {
56+
format!("{}/{}{}", self.tag_directory(), TAG_PREFIX, tag_name)
57+
}
58+
59+
/// Check if a tag exists.
60+
pub async fn tag_exists(&self, tag_name: &str) -> crate::Result<bool> {
61+
let path = self.tag_path(tag_name);
62+
let input = self.file_io.new_input(&path)?;
63+
input.exists().await
64+
}
65+
66+
/// Get the snapshot for a tag, or None if the tag file does not exist.
67+
///
68+
/// Tag files are JSON with the same schema as Snapshot.
69+
/// Reads directly and catches NotFound to avoid a separate exists() IO round-trip.
70+
pub async fn get(&self, tag_name: &str) -> crate::Result<Option<Snapshot>> {
71+
let path = self.tag_path(tag_name);
72+
let input = self.file_io.new_input(&path)?;
73+
let bytes = match input.read().await {
74+
Ok(b) => b,
75+
Err(crate::Error::IoUnexpected { ref source, .. })
76+
if source.kind() == opendal::ErrorKind::NotFound =>
77+
{
78+
return Ok(None);
79+
}
80+
Err(e) => return Err(e),
81+
};
82+
let snapshot: Snapshot =
83+
serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid {
84+
message: format!("tag '{tag_name}' JSON invalid: {e}"),
85+
source: Some(Box::new(e)),
86+
})?;
87+
Ok(Some(snapshot))
88+
}
89+
}

dev/spark/provision.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,12 @@ def main():
286286
"""
287287
)
288288

289+
# Create tags for tag-based time travel tests
290+
# Tag 'snapshot1' points to snapshot 1 (alice, bob)
291+
# Tag 'snapshot2' points to snapshot 2 (alice, bob, carol, dave)
292+
spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot1', 1)")
293+
spark.sql("CALL sys.create_tag('default.time_travel_table', 'snapshot2', 2)")
294+
289295

290296
if __name__ == "__main__":
291297
main()

docs/mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ theme:
4848
nav:
4949
- Home: index.md
5050
- Getting Started: getting-started.md
51+
- DataFusion Integration: datafusion.md
5152
- Architecture: architecture.md
5253
- Releases: releases.md
5354
- Contributing: contributing.md

docs/src/datafusion.md

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# DataFusion Integration
21+
22+
[Apache DataFusion](https://datafusion.apache.org/) is a fast, extensible query engine for building data-centric systems in Rust. The `paimon-datafusion` crate provides a read-only integration that lets you query Paimon tables using SQL.
23+
24+
## Setup
25+
26+
```toml
27+
[dependencies]
28+
paimon = "0.0.0"
29+
paimon-datafusion = "0.0.0"
30+
datafusion = "52"
31+
tokio = { version = "1", features = ["full"] }
32+
```
33+
34+
## Registering Tables
35+
36+
Register an entire Paimon catalog so all databases and tables are accessible via `catalog.database.table` syntax:
37+
38+
```rust
39+
use std::sync::Arc;
40+
use datafusion::prelude::SessionContext;
41+
use paimon_datafusion::PaimonCatalogProvider;
42+
43+
let ctx = SessionContext::new();
44+
ctx.register_catalog("paimon", Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))));
45+
46+
let df = ctx.sql("SELECT * FROM paimon.default.my_table").await?;
47+
df.show().await?;
48+
```
49+
50+
## Time Travel
51+
52+
Paimon supports time travel queries to read historical data. In DataFusion, this is done via the `FOR SYSTEM_TIME AS OF` clause.
53+
54+
### By Snapshot ID
55+
56+
Read data from a specific snapshot by passing an integer literal:
57+
58+
```sql
59+
SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1
60+
```
61+
62+
This sets the `scan.snapshot-id` option and reads exactly that snapshot.
63+
64+
### By Timestamp
65+
66+
Read data as of a specific point in time by passing a timestamp string in `YYYY-MM-DD HH:MM:SS` format:
67+
68+
```sql
69+
SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF '2024-01-01 00:00:00'
70+
```
71+
72+
This finds the latest snapshot whose commit time is less than or equal to the given timestamp. The timestamp is interpreted in the local timezone.
73+
74+
### By Tag Name
75+
76+
Read data from a named tag by passing a string that is not a timestamp:
77+
78+
```sql
79+
SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 'my_tag'
80+
```
81+
82+
Tags are named snapshots created via Paimon's tag management (e.g., `CALL sys.create_tag(...)` in Spark). This is useful for pinning a stable version of the data for reproducible queries.
83+
84+
### Enabling Time Travel Syntax
85+
86+
DataFusion requires the BigQuery SQL dialect to parse `FOR SYSTEM_TIME AS OF`. You also need to register the `PaimonRelationPlanner`:
87+
88+
```rust
89+
use std::sync::Arc;
90+
use datafusion::prelude::{SessionConfig, SessionContext};
91+
use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner};
92+
93+
let config = SessionConfig::new()
94+
.set_str("datafusion.sql_parser.dialect", "BigQuery");
95+
let ctx = SessionContext::new_with_config(config);
96+
97+
ctx.register_catalog("paimon", Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))));
98+
ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))?;
99+
100+
// Now time travel queries work
101+
let df = ctx.sql("SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1").await?;
102+
```

0 commit comments

Comments
 (0)