Skip to content

Commit 6635792

Browse files
authored
fix(scan): Harden time-travel selector validation (#219)
1 parent 022da71 commit 6635792

5 files changed

Lines changed: 332 additions & 27 deletions

File tree

crates/integration_tests/tests/read_tables.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,6 +1799,70 @@ async fn test_time_travel_by_tag_name() {
17991799
);
18001800
}
18011801

1802+
#[tokio::test]
1803+
async fn test_time_travel_conflicting_selectors_fail() {
1804+
let catalog = create_file_system_catalog();
1805+
let table = get_table_from_catalog(&catalog, "time_travel_table").await;
1806+
1807+
let conflicted = table.copy_with_options(HashMap::from([
1808+
("scan.tag-name".to_string(), "snapshot1".to_string()),
1809+
("scan.snapshot-id".to_string(), "2".to_string()),
1810+
]));
1811+
1812+
let plan_err = conflicted
1813+
.new_read_builder()
1814+
.new_scan()
1815+
.plan()
1816+
.await
1817+
.expect_err("conflicting time-travel selectors should fail");
1818+
1819+
match plan_err {
1820+
Error::DataInvalid { message, .. } => {
1821+
assert!(
1822+
message.contains("Only one time-travel selector may be set"),
1823+
"unexpected conflict error: {message}"
1824+
);
1825+
assert!(
1826+
message.contains("scan.snapshot-id"),
1827+
"conflict error should mention scan.snapshot-id: {message}"
1828+
);
1829+
assert!(
1830+
message.contains("scan.tag-name"),
1831+
"conflict error should mention scan.tag-name: {message}"
1832+
);
1833+
}
1834+
other => panic!("unexpected error: {other:?}"),
1835+
}
1836+
}
1837+
1838+
#[tokio::test]
1839+
async fn test_time_travel_invalid_numeric_selector_fails() {
1840+
let catalog = create_file_system_catalog();
1841+
let table = get_table_from_catalog(&catalog, "time_travel_table").await;
1842+
1843+
let invalid = table.copy_with_options(HashMap::from([(
1844+
"scan.snapshot-id".to_string(),
1845+
"not-a-number".to_string(),
1846+
)]));
1847+
1848+
let plan_err = invalid
1849+
.new_read_builder()
1850+
.new_scan()
1851+
.plan()
1852+
.await
1853+
.expect_err("invalid numeric time-travel selector should fail");
1854+
1855+
match plan_err {
1856+
Error::DataInvalid { message, .. } => {
1857+
assert!(
1858+
message.contains("Invalid value for scan.snapshot-id"),
1859+
"unexpected invalid selector error: {message}"
1860+
);
1861+
}
1862+
other => panic!("unexpected error: {other:?}"),
1863+
}
1864+
}
1865+
18021866
// ---------------------------------------------------------------------------
18031867
// Data evolution + drop column tests
18041868
// ---------------------------------------------------------------------------

crates/integrations/datafusion/tests/read_tables.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashMap;
1819
use std::sync::Arc;
1920

2021
use datafusion::arrow::array::{Array, Int32Array, StringArray};
@@ -57,6 +58,21 @@ async fn create_provider(table_name: &str) -> PaimonTableProvider {
5758
PaimonTableProvider::try_new(table).expect("Failed to create table provider")
5859
}
5960

61+
async fn create_provider_with_options(
62+
table_name: &str,
63+
extra_options: HashMap<String, String>,
64+
) -> PaimonTableProvider {
65+
let catalog = create_catalog();
66+
let identifier = Identifier::new("default", table_name);
67+
let table = catalog
68+
.get_table(&identifier)
69+
.await
70+
.expect("Failed to get table")
71+
.copy_with_options(extra_options);
72+
73+
PaimonTableProvider::try_new(table).expect("Failed to create table provider")
74+
}
75+
6076
async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
6177
let batches = collect_query(table_name, &format!("SELECT id, name FROM {table_name}"))
6278
.await
@@ -469,6 +485,71 @@ async fn test_time_travel_by_tag_name() {
469485
);
470486
}
471487

488+
#[tokio::test]
489+
async fn test_time_travel_conflicting_selectors_fail() {
490+
let provider = create_provider_with_options(
491+
"time_travel_table",
492+
HashMap::from([("scan.tag-name".to_string(), "snapshot1".to_string())]),
493+
)
494+
.await;
495+
496+
let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", "BigQuery");
497+
let ctx = SessionContext::new_with_config(config);
498+
ctx.register_table("time_travel_table", Arc::new(provider))
499+
.expect("Failed to register table");
500+
ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))
501+
.expect("Failed to register relation planner");
502+
503+
let err = ctx
504+
.sql("SELECT id, name FROM time_travel_table FOR SYSTEM_TIME AS OF 2")
505+
.await
506+
.expect("time travel query should parse")
507+
.collect()
508+
.await
509+
.expect_err("conflicting time-travel selectors should fail");
510+
511+
let message = err.to_string();
512+
assert!(
513+
message.contains("Only one time-travel selector may be set"),
514+
"unexpected conflict error: {message}"
515+
);
516+
assert!(
517+
message.contains("scan.snapshot-id"),
518+
"conflict error should mention scan.snapshot-id: {message}"
519+
);
520+
assert!(
521+
message.contains("scan.tag-name"),
522+
"conflict error should mention scan.tag-name: {message}"
523+
);
524+
}
525+
526+
#[tokio::test]
527+
async fn test_time_travel_invalid_numeric_selector_fails() {
528+
let provider = create_provider_with_options(
529+
"time_travel_table",
530+
HashMap::from([("scan.snapshot-id".to_string(), "not-a-number".to_string())]),
531+
)
532+
.await;
533+
534+
let ctx = SessionContext::new();
535+
ctx.register_table("time_travel_table", Arc::new(provider))
536+
.expect("Failed to register table");
537+
538+
let err = ctx
539+
.sql("SELECT id, name FROM time_travel_table")
540+
.await
541+
.expect("query should parse")
542+
.collect()
543+
.await
544+
.expect_err("invalid numeric time-travel selector should fail");
545+
546+
let message = err.to_string();
547+
assert!(
548+
message.contains("Invalid value for scan.snapshot-id"),
549+
"unexpected invalid selector error: {message}"
550+
);
551+
}
552+
472553
/// Verifies that data evolution merge correctly NULL-fills columns that no file in a
473554
/// merge group provides (e.g. a newly added column after MERGE INTO on old rows).
474555
/// Without the fix, `active_file_indices` would be empty and rows would be silently lost.

crates/paimon/src/spec/core_options.rs

Lines changed: 158 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ pub struct CoreOptions<'a> {
4040
options: &'a HashMap<String, String>,
4141
}
4242

43+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44+
pub(crate) enum TimeTravelSelector<'a> {
45+
TagName(&'a str),
46+
SnapshotId(i64),
47+
TimestampMillis(i64),
48+
}
49+
4350
impl<'a> CoreOptions<'a> {
4451
pub fn new(options: &'a HashMap<String, String>) -> Self {
4552
Self { options }
@@ -94,25 +101,90 @@ impl<'a> CoreOptions<'a> {
94101
.unwrap_or(true)
95102
}
96103

97-
/// Snapshot id for time travel via `scan.snapshot-id`.
104+
fn parse_i64_option(&self, option_name: &'static str) -> crate::Result<Option<i64>> {
105+
match self.options.get(option_name) {
106+
Some(value) => value
107+
.parse::<i64>()
108+
.map(Some)
109+
.map_err(|e| crate::Error::DataInvalid {
110+
message: format!("Invalid value for {option_name}: '{value}'"),
111+
source: Some(Box::new(e)),
112+
}),
113+
None => Ok(None),
114+
}
115+
}
116+
117+
/// Raw snapshot id accessor for `scan.snapshot-id`.
118+
///
119+
/// This compatibility accessor is lossy: it returns `None` for absent or
120+
/// invalid values and does not validate selector conflicts. Internal
121+
/// time-travel planning should use `try_time_travel_selector`.
98122
pub fn scan_snapshot_id(&self) -> Option<i64> {
99123
self.options
100124
.get(SCAN_SNAPSHOT_ID_OPTION)
101125
.and_then(|v| v.parse().ok())
102126
}
103127

104-
/// Timestamp in millis for time travel via `scan.timestamp-millis`.
128+
/// Raw timestamp accessor for `scan.timestamp-millis`.
129+
///
130+
/// This compatibility accessor is lossy: it returns `None` for absent or
131+
/// invalid values and does not validate selector conflicts. Internal
132+
/// time-travel planning should use `try_time_travel_selector`.
105133
pub fn scan_timestamp_millis(&self) -> Option<i64> {
106134
self.options
107135
.get(SCAN_TIMESTAMP_MILLIS_OPTION)
108136
.and_then(|v| v.parse().ok())
109137
}
110138

111-
/// Tag name for time travel via `scan.tag-name`.
112-
pub fn scan_tag_name(&self) -> Option<&str> {
139+
/// Raw tag name accessor for `scan.tag-name`.
140+
///
141+
/// This compatibility accessor does not validate selector conflicts.
142+
/// Internal time-travel planning should use `try_time_travel_selector`.
143+
pub fn scan_tag_name(&self) -> Option<&'a str> {
113144
self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str)
114145
}
115146

147+
fn configured_time_travel_selectors(&self) -> Vec<&'static str> {
148+
let mut selectors = Vec::with_capacity(3);
149+
if self.options.contains_key(SCAN_TAG_NAME_OPTION) {
150+
selectors.push(SCAN_TAG_NAME_OPTION);
151+
}
152+
if self.options.contains_key(SCAN_SNAPSHOT_ID_OPTION) {
153+
selectors.push(SCAN_SNAPSHOT_ID_OPTION);
154+
}
155+
if self.options.contains_key(SCAN_TIMESTAMP_MILLIS_OPTION) {
156+
selectors.push(SCAN_TIMESTAMP_MILLIS_OPTION);
157+
}
158+
selectors
159+
}
160+
161+
/// Validates and normalizes the internal time-travel selector.
162+
///
163+
/// This is the semantic owner for selector mutual exclusion and strict
164+
/// numeric parsing.
165+
pub(crate) fn try_time_travel_selector(&self) -> crate::Result<Option<TimeTravelSelector<'a>>> {
166+
let selectors = self.configured_time_travel_selectors();
167+
if selectors.len() > 1 {
168+
return Err(crate::Error::DataInvalid {
169+
message: format!(
170+
"Only one time-travel selector may be set, found: {}",
171+
selectors.join(", ")
172+
),
173+
source: None,
174+
});
175+
}
176+
177+
if let Some(tag_name) = self.scan_tag_name() {
178+
Ok(Some(TimeTravelSelector::TagName(tag_name)))
179+
} else if let Some(id) = self.parse_i64_option(SCAN_SNAPSHOT_ID_OPTION)? {
180+
Ok(Some(TimeTravelSelector::SnapshotId(id)))
181+
} else if let Some(ts) = self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? {
182+
Ok(Some(TimeTravelSelector::TimestampMillis(ts)))
183+
} else {
184+
Ok(None)
185+
}
186+
}
187+
116188
/// Explicit bucket key columns. If not set, defaults to primary keys for PK tables.
117189
pub fn bucket_key(&self) -> Option<Vec<String>> {
118190
self.options
@@ -230,4 +302,86 @@ mod tests {
230302
assert_eq!(core.partition_default_name(), "NULL_PART");
231303
assert!(!core.legacy_partition_name());
232304
}
305+
306+
#[test]
307+
fn test_try_time_travel_selector_rejects_conflicting_selectors() {
308+
let options = HashMap::from([
309+
(SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string()),
310+
(SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string()),
311+
]);
312+
let core = CoreOptions::new(&options);
313+
314+
let err = core
315+
.try_time_travel_selector()
316+
.expect_err("conflicting selectors should fail");
317+
match err {
318+
crate::Error::DataInvalid { message, .. } => {
319+
assert!(message.contains("Only one time-travel selector may be set"));
320+
assert!(message.contains(SCAN_TAG_NAME_OPTION));
321+
assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
322+
}
323+
other => panic!("unexpected error: {other:?}"),
324+
}
325+
}
326+
327+
#[test]
328+
fn test_try_time_travel_selector_rejects_invalid_numeric_values() {
329+
let snapshot_options =
330+
HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), "abc".to_string())]);
331+
let snapshot_core = CoreOptions::new(&snapshot_options);
332+
333+
let snapshot_err = snapshot_core
334+
.try_time_travel_selector()
335+
.expect_err("invalid snapshot id should fail");
336+
match snapshot_err {
337+
crate::Error::DataInvalid { message, .. } => {
338+
assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
339+
}
340+
other => panic!("unexpected error: {other:?}"),
341+
}
342+
343+
let timestamp_options =
344+
HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "xyz".to_string())]);
345+
let timestamp_core = CoreOptions::new(&timestamp_options);
346+
347+
let timestamp_err = timestamp_core
348+
.try_time_travel_selector()
349+
.expect_err("invalid timestamp millis should fail");
350+
match timestamp_err {
351+
crate::Error::DataInvalid { message, .. } => {
352+
assert!(message.contains(SCAN_TIMESTAMP_MILLIS_OPTION));
353+
}
354+
other => panic!("unexpected error: {other:?}"),
355+
}
356+
}
357+
358+
#[test]
359+
fn test_try_time_travel_selector_normalizes_valid_selector() {
360+
let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string())]);
361+
let tag_core = CoreOptions::new(&tag_options);
362+
assert_eq!(
363+
tag_core.try_time_travel_selector().expect("tag selector"),
364+
Some(TimeTravelSelector::TagName("tag1"))
365+
);
366+
367+
let snapshot_options =
368+
HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string())]);
369+
let snapshot_core = CoreOptions::new(&snapshot_options);
370+
assert_eq!(
371+
snapshot_core
372+
.try_time_travel_selector()
373+
.expect("snapshot selector"),
374+
Some(TimeTravelSelector::SnapshotId(7))
375+
);
376+
377+
let timestamp_options =
378+
HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "1234".to_string())]);
379+
let timestamp_core = CoreOptions::new(&timestamp_options);
380+
assert_eq!(
381+
timestamp_core
382+
.try_time_travel_selector()
383+
.expect("timestamp selector"),
384+
Some(TimeTravelSelector::TimestampMillis(1234))
385+
);
386+
}
233387
}

crates/paimon/src/spec/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod data_file;
2626
pub use data_file::*;
2727

2828
mod core_options;
29+
pub(crate) use core_options::TimeTravelSelector;
2930
pub use core_options::*;
3031

3132
mod schema;

0 commit comments

Comments
 (0)