Skip to content

Commit e0d1f69

Browse files
authored
feat(tantivy): Add Tantivy full-text search with on-demand archive reading (#231)
Introduce a complete Tantivy-based full-text search pipeline for global indexes, with on-demand I/O throughout: - ArchiveDirectory: reads only the archive header eagerly; file data is loaded via async FileRead when Tantivy requests it (sync-to-async bridge using std::thread::scope). - TantivyFullTextWriter: streams the packed archive directly to an OutputFile instead of buffering in memory. - TantivyFullTextReader: opens from InputFile/FileRead, never loads the full archive into memory. - FullTextSearchBuilder: self-contained builder on Table that reads the index manifest, evaluates searches against multiple Tantivy indexes in parallel (try_join_all), and returns ScoredGlobalIndexResult. - ScoredGlobalIndexResult + bitmap_to_ranges moved to table/source.rs (alongside RowRange) so vector search can reuse them later. - TableScan.with_row_ranges(): generic row-range filtering, decoupled from full-text specifics. - DataFusion full_text_search UDTF integration with test data.
1 parent e65d222 commit e0d1f69

17 files changed

Lines changed: 1715 additions & 44 deletions

File tree

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ jobs:
5353
run: cargo fmt --all -- --check
5454

5555
- name: Clippy
56-
run: cargo clippy --all-targets --workspace -- -D warnings
56+
run: cargo clippy --all-targets --workspace --features fulltext -- -D warnings
5757

5858
build:
5959
runs-on: ${{ matrix.os }}
@@ -66,7 +66,7 @@ jobs:
6666
steps:
6767
- uses: actions/checkout@v6
6868
- name: Build
69-
run: cargo build
69+
run: cargo build --features fulltext
7070

7171
unit:
7272
runs-on: ${{ matrix.os }}
@@ -80,7 +80,7 @@ jobs:
8080
- uses: actions/checkout@v6
8181

8282
- name: Test
83-
run: cargo test -p paimon --all-targets
83+
run: cargo test -p paimon --all-targets --features fulltext
8484
env:
8585
RUST_LOG: DEBUG
8686
RUST_BACKTRACE: full

crates/integrations/datafusion/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919
name = "paimon-datafusion"
2020
edition.workspace = true
2121
version.workspace = true
22+
exclude = ["testdata/"]
2223
license.workspace = true
2324
homepage = "https://paimon.apache.org/docs/rust/datafusion/"
2425
documentation = "https://docs.rs/paimon-datafusion"
2526
description = "Apache Paimon DataFusion Integration"
2627
categories = ["database"]
2728
keywords = ["paimon", "datafusion", "integrations"]
2829

30+
[features]
31+
fulltext = ["paimon/fulltext"]
32+
2933
[dependencies]
3034
async-trait = "0.1"
3135
chrono = "0.4"
@@ -37,8 +41,10 @@ tokio = { workspace = true, features = ["rt", "time", "fs"] }
3741
[dev-dependencies]
3842
arrow-array = { workspace = true }
3943
arrow-schema = { workspace = true }
44+
flate2 = "1"
4045
parquet = { workspace = true }
4146
serde = "1"
4247
serde_json = "1"
48+
tar = "0.4"
4349
tempfile = "3"
4450
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! `full_text_search` table-valued function for DataFusion.
19+
//!
20+
//! Usage:
21+
//! ```sql
22+
//! SELECT * FROM full_text_search('table_name', 'column_name', 'query text', 10)
23+
//! ```
24+
//!
25+
//! Reference: [PaimonTableValuedFunctions.scala](https://github.com/apache/paimon/blob/master/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonTableValuedFunctions.scala)
26+
27+
use std::any::Any;
28+
use std::fmt::Debug;
29+
use std::sync::Arc;
30+
31+
use async_trait::async_trait;
32+
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
33+
use datafusion::catalog::Session;
34+
use datafusion::catalog::TableFunctionImpl;
35+
use datafusion::datasource::{TableProvider, TableType};
36+
use datafusion::error::Result as DFResult;
37+
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
38+
use datafusion::physical_plan::ExecutionPlan;
39+
use datafusion::prelude::SessionContext;
40+
use paimon::catalog::{Catalog, Identifier};
41+
42+
use crate::error::to_datafusion_error;
43+
use crate::runtime::{await_with_runtime, block_on_with_runtime};
44+
use crate::table::{build_paimon_scan, PaimonTableProvider};
45+
46+
/// Register the `full_text_search` table-valued function on a [`SessionContext`].
47+
pub fn register_full_text_search(
48+
ctx: &SessionContext,
49+
catalog: Arc<dyn Catalog>,
50+
default_database: &str,
51+
) {
52+
ctx.register_udtf(
53+
"full_text_search",
54+
Arc::new(FullTextSearchFunction::new(catalog, default_database)),
55+
);
56+
}
57+
58+
/// Table function that performs full-text search on a Paimon table.
59+
///
60+
/// Arguments: `(table_name STRING, column_name STRING, query_text STRING, limit INT)`
61+
pub struct FullTextSearchFunction {
62+
catalog: Arc<dyn Catalog>,
63+
default_database: String,
64+
}
65+
66+
impl Debug for FullTextSearchFunction {
67+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68+
f.debug_struct("FullTextSearchFunction")
69+
.field("default_database", &self.default_database)
70+
.finish()
71+
}
72+
}
73+
74+
impl FullTextSearchFunction {
75+
pub fn new(catalog: Arc<dyn Catalog>, default_database: &str) -> Self {
76+
Self {
77+
catalog,
78+
default_database: default_database.to_string(),
79+
}
80+
}
81+
}
82+
83+
impl TableFunctionImpl for FullTextSearchFunction {
84+
fn call(&self, args: &[Expr]) -> DFResult<Arc<dyn TableProvider>> {
85+
if args.len() != 4 {
86+
return Err(datafusion::error::DataFusionError::Plan(
87+
"full_text_search requires 4 arguments: (table_name, column_name, query_text, limit)".to_string(),
88+
));
89+
}
90+
91+
let table_name = extract_string_literal(&args[0], "table_name")?;
92+
let column_name = extract_string_literal(&args[1], "column_name")?;
93+
let query_text = extract_string_literal(&args[2], "query_text")?;
94+
let limit = extract_int_literal(&args[3], "limit")?;
95+
96+
if limit <= 0 {
97+
return Err(datafusion::error::DataFusionError::Plan(
98+
"full_text_search: limit must be positive".to_string(),
99+
));
100+
}
101+
102+
let identifier = parse_table_identifier(&table_name, &self.default_database)?;
103+
104+
let catalog = Arc::clone(&self.catalog);
105+
let table = block_on_with_runtime(
106+
async move { catalog.get_table(&identifier).await },
107+
"full_text_search: catalog access thread panicked",
108+
)
109+
.map_err(to_datafusion_error)?;
110+
111+
let inner = PaimonTableProvider::try_new(table)?;
112+
113+
Ok(Arc::new(FullTextSearchTableProvider {
114+
inner,
115+
column_name,
116+
query_text,
117+
limit: limit as usize,
118+
}))
119+
}
120+
}
121+
122+
/// A wrapper around [`PaimonTableProvider`] that injects full-text search
123+
/// row filtering into the scan path.
124+
#[derive(Debug)]
125+
struct FullTextSearchTableProvider {
126+
inner: PaimonTableProvider,
127+
column_name: String,
128+
query_text: String,
129+
limit: usize,
130+
}
131+
132+
#[async_trait]
133+
impl TableProvider for FullTextSearchTableProvider {
134+
fn as_any(&self) -> &dyn Any {
135+
self
136+
}
137+
138+
fn schema(&self) -> ArrowSchemaRef {
139+
self.inner.schema()
140+
}
141+
142+
fn table_type(&self) -> TableType {
143+
TableType::Base
144+
}
145+
146+
async fn scan(
147+
&self,
148+
state: &dyn Session,
149+
projection: Option<&Vec<usize>>,
150+
_filters: &[Expr],
151+
limit: Option<usize>,
152+
) -> DFResult<Arc<dyn ExecutionPlan>> {
153+
let table = self.inner.table();
154+
155+
// Use FullTextSearchBuilder to execute the search.
156+
let row_ranges = await_with_runtime(async {
157+
let mut builder = table.new_full_text_search_builder();
158+
builder
159+
.with_text_column(&self.column_name)
160+
.with_query_text(&self.query_text)
161+
.with_limit(self.limit);
162+
builder.execute().await.map_err(to_datafusion_error)
163+
})
164+
.await?;
165+
166+
// Convert search results to row ranges and inject into the scan.
167+
let mut read_builder = table.new_read_builder();
168+
if let Some(limit) = limit {
169+
read_builder.with_limit(limit);
170+
}
171+
let scan = if row_ranges.is_empty() {
172+
read_builder.new_scan()
173+
} else {
174+
read_builder.new_scan().with_row_ranges(row_ranges)
175+
};
176+
let plan = await_with_runtime(scan.plan())
177+
.await
178+
.map_err(to_datafusion_error)?;
179+
180+
let target = state.config_options().execution.target_partitions;
181+
build_paimon_scan(
182+
table,
183+
&self.schema(),
184+
&plan,
185+
projection,
186+
None,
187+
limit,
188+
target,
189+
)
190+
}
191+
192+
fn supports_filters_pushdown(
193+
&self,
194+
filters: &[&Expr],
195+
) -> DFResult<Vec<TableProviderFilterPushDown>> {
196+
Ok(vec![
197+
TableProviderFilterPushDown::Unsupported;
198+
filters.len()
199+
])
200+
}
201+
}
202+
203+
fn extract_string_literal(expr: &Expr, name: &str) -> DFResult<String> {
204+
match expr {
205+
Expr::Literal(scalar, _) => {
206+
let s = scalar.try_as_str().flatten().ok_or_else(|| {
207+
datafusion::error::DataFusionError::Plan(format!(
208+
"full_text_search: {name} must be a string literal, got: {expr}"
209+
))
210+
})?;
211+
Ok(s.to_string())
212+
}
213+
_ => Err(datafusion::error::DataFusionError::Plan(format!(
214+
"full_text_search: {name} must be a literal, got: {expr}"
215+
))),
216+
}
217+
}
218+
219+
fn extract_int_literal(expr: &Expr, name: &str) -> DFResult<i64> {
220+
use datafusion::common::ScalarValue;
221+
match expr {
222+
Expr::Literal(scalar, _) => match scalar {
223+
ScalarValue::Int8(Some(v)) => Ok(*v as i64),
224+
ScalarValue::Int16(Some(v)) => Ok(*v as i64),
225+
ScalarValue::Int32(Some(v)) => Ok(*v as i64),
226+
ScalarValue::Int64(Some(v)) => Ok(*v),
227+
ScalarValue::UInt8(Some(v)) => Ok(*v as i64),
228+
ScalarValue::UInt16(Some(v)) => Ok(*v as i64),
229+
ScalarValue::UInt32(Some(v)) => Ok(*v as i64),
230+
ScalarValue::UInt64(Some(v)) => i64::try_from(*v).map_err(|_| {
231+
datafusion::error::DataFusionError::Plan(format!(
232+
"full_text_search: {name} value {v} exceeds i64 range"
233+
))
234+
}),
235+
_ => Err(datafusion::error::DataFusionError::Plan(format!(
236+
"full_text_search: {name} must be an integer literal, got: {expr}"
237+
))),
238+
},
239+
_ => Err(datafusion::error::DataFusionError::Plan(format!(
240+
"full_text_search: {name} must be a literal, got: {expr}"
241+
))),
242+
}
243+
}
244+
245+
fn parse_table_identifier(name: &str, default_database: &str) -> DFResult<Identifier> {
246+
let parts: Vec<&str> = name.split('.').collect();
247+
match parts.len() {
248+
1 => Ok(Identifier::new(default_database, parts[0])),
249+
2 => Ok(Identifier::new(parts[0], parts[1])),
250+
// 3-part name: catalog.database.table — ignore catalog prefix
251+
3 => Ok(Identifier::new(parts[1], parts[2])),
252+
_ => Err(datafusion::error::DataFusionError::Plan(format!(
253+
"full_text_search: invalid table name '{name}', expected 'table', 'database.table', or 'catalog.database.table'"
254+
))),
255+
}
256+
}

crates/integrations/datafusion/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,17 @@
3939
mod catalog;
4040
mod error;
4141
mod filter_pushdown;
42+
#[cfg(feature = "fulltext")]
43+
mod full_text_search;
4244
mod physical_plan;
4345
mod relation_planner;
4446
pub mod runtime;
4547
mod table;
4648

4749
pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
4850
pub use error::to_datafusion_error;
51+
#[cfg(feature = "fulltext")]
52+
pub use full_text_search::{register_full_text_search, FullTextSearchFunction};
4953
pub use physical_plan::PaimonTableScan;
5054
pub use relation_planner::PaimonRelationPlanner;
5155
pub use table::PaimonTableProvider;

0 commit comments

Comments
 (0)