55use crate :: reader:: data:: init_builtin_data;
66use crate :: reader:: { connection:: ConnectionInfo , Reader } ;
77use crate :: { DataFrame , GgsqlError , Result } ;
8- use arrow:: datatypes:: DataType as ArrowDataType ;
98use arrow:: ipc:: reader:: FileReader ;
10- use arrow :: record_batch :: RecordBatch ;
9+ use duckdb :: vtab :: arrow :: { arrow_recordbatch_to_query_params , ArrowVTab } ;
1110use duckdb:: { params, Connection } ;
1211use polars:: io:: SerWriter ;
1312use polars:: prelude:: * ;
@@ -72,6 +71,12 @@ impl DuckDBReader {
7271 }
7372 } ;
7473
74+ // Register Arrow virtual table function for DataFrame registration
75+ conn. register_table_function :: < ArrowVTab > ( "arrow" )
76+ . map_err ( |e| {
77+ GgsqlError :: ReaderError ( format ! ( "Failed to register arrow function: {}" , e) )
78+ } ) ?;
79+
7580 Ok ( Self {
7681 conn,
7782 registered_tables : HashSet :: new ( ) ,
@@ -125,8 +130,8 @@ fn validate_table_name(name: &str) -> Result<()> {
125130 Ok ( ( ) )
126131}
127132
128- /// Convert a Polars DataFrame to an Arrow RecordBatch via IPC serialization
129- fn dataframe_to_record_batch ( df : & DataFrame ) -> Result < RecordBatch > {
133+ /// Convert a Polars DataFrame to DuckDB Arrow query parameters via IPC serialization
134+ fn dataframe_to_arrow_params ( df : DataFrame ) -> Result < [ usize ; 2 ] > {
130135 // Serialize DataFrame to IPC format
131136 let mut buffer = Vec :: new ( ) ;
132137 {
@@ -150,61 +155,15 @@ fn dataframe_to_record_batch(df: &DataFrame) -> Result<RecordBatch> {
150155 ) ) ;
151156 }
152157
153- if batches. len ( ) == 1 {
154- Ok ( batches. into_iter ( ) . next ( ) . unwrap ( ) )
158+ // For single batch, use directly; for multiple, concatenate
159+ let rb = if batches. len ( ) == 1 {
160+ batches. into_iter ( ) . next ( ) . unwrap ( )
155161 } else {
156162 arrow:: compute:: concat_batches ( & batches[ 0 ] . schema ( ) , & batches)
157- . map_err ( |e| GgsqlError :: ReaderError ( format ! ( "Failed to concat batches: {}" , e) ) )
158- }
159- }
160-
161- /// Map an Arrow data type to a DuckDB SQL type name
162- fn arrow_type_to_duckdb_sql ( dt : & ArrowDataType ) -> Result < & ' static str > {
163- Ok ( match dt {
164- ArrowDataType :: Boolean => "BOOLEAN" ,
165- ArrowDataType :: Int8 => "TINYINT" ,
166- ArrowDataType :: Int16 => "SMALLINT" ,
167- ArrowDataType :: Int32 => "INTEGER" ,
168- ArrowDataType :: Int64 => "BIGINT" ,
169- ArrowDataType :: UInt8 => "UTINYINT" ,
170- ArrowDataType :: UInt16 => "USMALLINT" ,
171- ArrowDataType :: UInt32 => "UINTEGER" ,
172- ArrowDataType :: UInt64 => "UBIGINT" ,
173- ArrowDataType :: Float16 | ArrowDataType :: Float32 => "FLOAT" ,
174- ArrowDataType :: Float64 => "DOUBLE" ,
175- ArrowDataType :: Utf8 | ArrowDataType :: LargeUtf8 | ArrowDataType :: Utf8View => "VARCHAR" ,
176- ArrowDataType :: Binary | ArrowDataType :: LargeBinary | ArrowDataType :: BinaryView => "BLOB" ,
177- ArrowDataType :: Date32 | ArrowDataType :: Date64 => "DATE" ,
178- ArrowDataType :: Timestamp ( _, _) => "TIMESTAMP" ,
179- ArrowDataType :: Time32 ( _) | ArrowDataType :: Time64 ( _) => "TIME" ,
180- ArrowDataType :: Duration ( _) => "INTERVAL" ,
181- ArrowDataType :: Null => "INTEGER" ,
182- _ => {
183- return Err ( GgsqlError :: ReaderError ( format ! (
184- "Unsupported Arrow type for DuckDB registration: {:?}" ,
185- dt
186- ) ) )
187- }
188- } )
189- }
190-
191- /// Generate a CREATE TEMP TABLE statement from an Arrow RecordBatch schema
192- fn create_table_ddl ( name : & str , rb : & RecordBatch ) -> Result < String > {
193- let schema = rb. schema ( ) ;
194- let columns: Vec < String > = schema
195- . fields ( )
196- . iter ( )
197- . map ( |field| {
198- let sql_type = arrow_type_to_duckdb_sql ( field. data_type ( ) ) ?;
199- Ok ( format ! ( "\" {}\" {}" , field. name( ) , sql_type) )
200- } )
201- . collect :: < Result < Vec < _ > > > ( ) ?;
163+ . map_err ( |e| GgsqlError :: ReaderError ( format ! ( "Failed to concat batches: {}" , e) ) ) ?
164+ } ;
202165
203- Ok ( format ! (
204- "CREATE TEMP TABLE \" {}\" ({})" ,
205- name,
206- columns. join( ", " )
207- ) )
166+ Ok ( arrow_recordbatch_to_query_params ( rb) )
208167}
209168
210169/// Helper struct for building typed columns from rows
@@ -557,30 +516,52 @@ impl Reader for DuckDBReader {
557516 ) ) ) ;
558517 }
559518
560- // Convert to Arrow RecordBatch, create the table, and bulk-insert via
561- // the Appender API. The appender automatically chunks large batches to
562- // stay within DuckDB's internal STANDARD_VECTOR_SIZE (2048) limit, so we
563- // don't need to manage chunking ourselves.
564- let rb = dataframe_to_record_batch ( & df) ?;
565-
566- let ddl = create_table_ddl ( name, & rb) ?;
567- self . conn . execute ( & ddl, [ ] ) . map_err ( |e| {
568- GgsqlError :: ReaderError ( format ! ( "Failed to create table '{}': {}" , name, e) )
569- } ) ?;
570-
571- if rb. num_rows ( ) > 0 {
572- let mut appender = self . conn . appender ( name) . map_err ( |e| {
573- GgsqlError :: ReaderError ( format ! (
574- "Failed to create appender for table '{}': {}" ,
575- name, e
576- ) )
519+ // DuckDB's Arrow virtual table function (in duckdb-rs) writes an entire
520+ // RecordBatch into a single DataChunk whose vectors have a fixed capacity
521+ // of STANDARD_VECTOR_SIZE (2048). Passing a RecordBatch with more rows
522+ // causes a panic. Work around this by chunking large DataFrames.
523+ const MAX_ARROW_BATCH_ROWS : usize = 2048 ;
524+ let total_rows = df. height ( ) ;
525+
526+ if total_rows <= MAX_ARROW_BATCH_ROWS {
527+ // Small DataFrame: register in a single batch
528+ let params = dataframe_to_arrow_params ( df) ?;
529+ let sql = format ! (
530+ "CREATE TEMP TABLE \" {}\" AS SELECT * FROM arrow(?, ?)" ,
531+ name
532+ ) ;
533+ self . conn . execute ( & sql, params) . map_err ( |e| {
534+ GgsqlError :: ReaderError ( format ! ( "Failed to register table '{}': {}" , name, e) )
577535 } ) ?;
578- appender. append_record_batch ( rb) . map_err ( |e| {
579- GgsqlError :: ReaderError ( format ! (
580- "Failed to append data to table '{}': {}" ,
581- name, e
582- ) )
536+ } else {
537+ // Large DataFrame: create table from first chunk, then insert remaining chunks
538+ let first_chunk = df. slice ( 0 , MAX_ARROW_BATCH_ROWS ) ;
539+ let params = dataframe_to_arrow_params ( first_chunk) ?;
540+ let create_sql = format ! (
541+ "CREATE TEMP TABLE \" {}\" AS SELECT * FROM arrow(?, ?)" ,
542+ name
543+ ) ;
544+ self . conn . execute ( & create_sql, params) . map_err ( |e| {
545+ GgsqlError :: ReaderError ( format ! ( "Failed to register table '{}': {}" , name, e) )
583546 } ) ?;
547+
548+ let mut offset = MAX_ARROW_BATCH_ROWS ;
549+ while offset < total_rows {
550+ let chunk_size = std:: cmp:: min ( MAX_ARROW_BATCH_ROWS , total_rows - offset) ;
551+ let chunk = df. slice ( offset as i64 , chunk_size) ;
552+ let params = dataframe_to_arrow_params ( chunk) ?;
553+ let insert_sql = format ! (
554+ "INSERT INTO \" {}\" SELECT * FROM arrow(?, ?)" ,
555+ name
556+ ) ;
557+ self . conn . execute ( & insert_sql, params) . map_err ( |e| {
558+ GgsqlError :: ReaderError ( format ! (
559+ "Failed to insert chunk into table '{}': {}" ,
560+ name, e
561+ ) )
562+ } ) ?;
563+ offset += chunk_size;
564+ }
584565 }
585566
586567 // Track the table so we can unregister it later
@@ -841,8 +822,8 @@ mod tests {
841822
842823 #[ test]
843824 fn test_register_large_dataframe ( ) {
844- // Verify that the appender handles DataFrames larger than DuckDB's
845- // STANDARD_VECTOR_SIZE (2048 rows) without panicking .
825+ // duckdb-rs Arrow vtab has a vector capacity of 2048 rows. DataFrames
826+ // larger than this must be chunked to avoid a panic .
846827 let mut reader = DuckDBReader :: from_connection_string ( "duckdb://memory" ) . unwrap ( ) ;
847828
848829 let n = 3000 ;
0 commit comments