Skip to content

Commit cd72769

Browse files
cpsievertclaude
andcommitted
refactor: use appender-arrow API for DataFrame registration
Replace manual 2048-row chunking with duckdb-rs's Appender API, which handles chunking internally via zero-copy Arrow slicing (duckdb-rs#530). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7c53e1c commit cd72769

2 files changed

Lines changed: 81 additions & 62 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ polars = { version = "0.52", features = ["lazy", "sql", "ipc"] }
3333
polars-ops = { version = "0.52", features = ["pivot"] }
3434

3535
# Readers
36-
duckdb = { version = "1.4", features = ["bundled", "vtab-arrow"] }
36+
duckdb = { version = "1.4", features = ["bundled", "appender-arrow"] }
3737
arrow = { version = "56", default-features = false, features = ["ipc"] }
3838
postgres = "0.19"
3939
sqlx = { version = "0.8", features = ["postgres", "runtime-tokio-rustls"] }

src/reader/duckdb.rs

Lines changed: 80 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
use crate::reader::data::init_builtin_data;
66
use crate::reader::{connection::ConnectionInfo, Reader};
77
use crate::{DataFrame, GgsqlError, Result};
8+
use arrow::datatypes::DataType as ArrowDataType;
89
use arrow::ipc::reader::FileReader;
9-
use duckdb::vtab::arrow::{arrow_recordbatch_to_query_params, ArrowVTab};
10+
use arrow::record_batch::RecordBatch;
1011
use duckdb::{params, Connection};
1112
use polars::io::SerWriter;
1213
use polars::prelude::*;
@@ -71,12 +72,6 @@ impl DuckDBReader {
7172
}
7273
};
7374

74-
// Register Arrow virtual table function for DataFrame registration
75-
conn.register_table_function::<ArrowVTab>("arrow")
76-
.map_err(|e| {
77-
GgsqlError::ReaderError(format!("Failed to register arrow function: {}", e))
78-
})?;
79-
8075
Ok(Self {
8176
conn,
8277
registered_tables: HashSet::new(),
@@ -130,8 +125,8 @@ fn validate_table_name(name: &str) -> Result<()> {
130125
Ok(())
131126
}
132127

133-
/// Convert a Polars DataFrame to DuckDB Arrow query parameters via IPC serialization
134-
fn dataframe_to_arrow_params(df: DataFrame) -> Result<[usize; 2]> {
128+
/// Convert a Polars DataFrame to an Arrow RecordBatch via IPC serialization
129+
fn dataframe_to_record_batch(df: &DataFrame) -> Result<RecordBatch> {
135130
// Serialize DataFrame to IPC format
136131
let mut buffer = Vec::new();
137132
{
@@ -155,15 +150,61 @@ fn dataframe_to_arrow_params(df: DataFrame) -> Result<[usize; 2]> {
155150
));
156151
}
157152

158-
// For single batch, use directly; for multiple, concatenate
159-
let rb = if batches.len() == 1 {
160-
batches.into_iter().next().unwrap()
153+
if batches.len() == 1 {
154+
Ok(batches.into_iter().next().unwrap())
161155
} else {
162156
arrow::compute::concat_batches(&batches[0].schema(), &batches)
163-
.map_err(|e| GgsqlError::ReaderError(format!("Failed to concat batches: {}", e)))?
164-
};
157+
.map_err(|e| GgsqlError::ReaderError(format!("Failed to concat batches: {}", e)))
158+
}
159+
}
160+
161+
/// Map an Arrow data type to a DuckDB SQL type name
162+
fn arrow_type_to_duckdb_sql(dt: &ArrowDataType) -> Result<&'static str> {
163+
Ok(match dt {
164+
ArrowDataType::Boolean => "BOOLEAN",
165+
ArrowDataType::Int8 => "TINYINT",
166+
ArrowDataType::Int16 => "SMALLINT",
167+
ArrowDataType::Int32 => "INTEGER",
168+
ArrowDataType::Int64 => "BIGINT",
169+
ArrowDataType::UInt8 => "UTINYINT",
170+
ArrowDataType::UInt16 => "USMALLINT",
171+
ArrowDataType::UInt32 => "UINTEGER",
172+
ArrowDataType::UInt64 => "UBIGINT",
173+
ArrowDataType::Float16 | ArrowDataType::Float32 => "FLOAT",
174+
ArrowDataType::Float64 => "DOUBLE",
175+
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => "VARCHAR",
176+
ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => "BLOB",
177+
ArrowDataType::Date32 | ArrowDataType::Date64 => "DATE",
178+
ArrowDataType::Timestamp(_, _) => "TIMESTAMP",
179+
ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => "TIME",
180+
ArrowDataType::Duration(_) => "INTERVAL",
181+
ArrowDataType::Null => "INTEGER",
182+
_ => {
183+
return Err(GgsqlError::ReaderError(format!(
184+
"Unsupported Arrow type for DuckDB registration: {:?}",
185+
dt
186+
)))
187+
}
188+
})
189+
}
190+
191+
/// Generate a CREATE TEMP TABLE statement from an Arrow RecordBatch schema
192+
fn create_table_ddl(name: &str, rb: &RecordBatch) -> Result<String> {
193+
let schema = rb.schema();
194+
let columns: Vec<String> = schema
195+
.fields()
196+
.iter()
197+
.map(|field| {
198+
let sql_type = arrow_type_to_duckdb_sql(field.data_type())?;
199+
Ok(format!("\"{}\" {}", field.name(), sql_type))
200+
})
201+
.collect::<Result<Vec<_>>>()?;
165202

166-
Ok(arrow_recordbatch_to_query_params(rb))
203+
Ok(format!(
204+
"CREATE TEMP TABLE \"{}\" ({})",
205+
name,
206+
columns.join(", ")
207+
))
167208
}
168209

169210
/// Helper struct for building typed columns from rows
@@ -516,52 +557,30 @@ impl Reader for DuckDBReader {
516557
)));
517558
}
518559

519-
// DuckDB's Arrow virtual table function (in duckdb-rs) writes an entire
520-
// RecordBatch into a single DataChunk whose vectors have a fixed capacity
521-
// of STANDARD_VECTOR_SIZE (2048). Passing a RecordBatch with more rows
522-
// causes a panic. Work around this by chunking large DataFrames.
523-
const MAX_ARROW_BATCH_ROWS: usize = 2048;
524-
let total_rows = df.height();
525-
526-
if total_rows <= MAX_ARROW_BATCH_ROWS {
527-
// Small DataFrame: register in a single batch
528-
let params = dataframe_to_arrow_params(df)?;
529-
let sql = format!(
530-
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
531-
name
532-
);
533-
self.conn.execute(&sql, params).map_err(|e| {
534-
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
560+
// Convert to Arrow RecordBatch, create the table, and bulk-insert via
561+
// the Appender API. The appender automatically chunks large batches to
562+
// stay within DuckDB's internal STANDARD_VECTOR_SIZE (2048) limit, so we
563+
// don't need to manage chunking ourselves.
564+
let rb = dataframe_to_record_batch(&df)?;
565+
566+
let ddl = create_table_ddl(name, &rb)?;
567+
self.conn.execute(&ddl, []).map_err(|e| {
568+
GgsqlError::ReaderError(format!("Failed to create table '{}': {}", name, e))
569+
})?;
570+
571+
if rb.num_rows() > 0 {
572+
let mut appender = self.conn.appender(name).map_err(|e| {
573+
GgsqlError::ReaderError(format!(
574+
"Failed to create appender for table '{}': {}",
575+
name, e
576+
))
535577
})?;
536-
} else {
537-
// Large DataFrame: create table from first chunk, then insert remaining chunks
538-
let first_chunk = df.slice(0, MAX_ARROW_BATCH_ROWS);
539-
let params = dataframe_to_arrow_params(first_chunk)?;
540-
let create_sql = format!(
541-
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
542-
name
543-
);
544-
self.conn.execute(&create_sql, params).map_err(|e| {
545-
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
578+
appender.append_record_batch(rb).map_err(|e| {
579+
GgsqlError::ReaderError(format!(
580+
"Failed to append data to table '{}': {}",
581+
name, e
582+
))
546583
})?;
547-
548-
let mut offset = MAX_ARROW_BATCH_ROWS;
549-
while offset < total_rows {
550-
let chunk_size = std::cmp::min(MAX_ARROW_BATCH_ROWS, total_rows - offset);
551-
let chunk = df.slice(offset as i64, chunk_size);
552-
let params = dataframe_to_arrow_params(chunk)?;
553-
let insert_sql = format!(
554-
"INSERT INTO \"{}\" SELECT * FROM arrow(?, ?)",
555-
name
556-
);
557-
self.conn.execute(&insert_sql, params).map_err(|e| {
558-
GgsqlError::ReaderError(format!(
559-
"Failed to insert chunk into table '{}': {}",
560-
name, e
561-
))
562-
})?;
563-
offset += chunk_size;
564-
}
565584
}
566585

567586
// Track the table so we can unregister it later
@@ -822,8 +841,8 @@ mod tests {
822841

823842
#[test]
824843
fn test_register_large_dataframe() {
825-
// duckdb-rs Arrow vtab has a vector capacity of 2048 rows. DataFrames
826-
// larger than this must be chunked to avoid a panic.
844+
// Verify that the appender handles DataFrames larger than DuckDB's
845+
// STANDARD_VECTOR_SIZE (2048 rows) without panicking.
827846
let mut reader = DuckDBReader::from_connection_string("duckdb://memory").unwrap();
828847

829848
let n = 3000;

0 commit comments

Comments
 (0)