@@ -59,48 +59,29 @@ void PixelsScanFunction::PixelsScanImplementation(ClientContext &context,
5959 auto &bind_data = (PixelsReadBindData &)*data_p.bind_data ;
6060
6161 do {
62- if (data.currPixelsRecordReader .get () == nullptr ||
63- (data.currPixelsRecordReader ->isEndOfFile () && data.rowOffset >= data.vectorizedRowBatch ->rowCount )) {
64- if (data.vectorizedRowBatch .get () != nullptr ) {
65- data.vectorizedRowBatch ->close ();
66- }
67- if (data.currPixelsRecordReader .get () != nullptr ) {
62+ if (data.currPixelsRecordReader == nullptr ||
63+ (data.currPixelsRecordReader ->isEndOfFile () && data.vectorizedRowBatch ->isEndOfFile ())) {
64+ if (data.currPixelsRecordReader != nullptr ) {
6865 data.currPixelsRecordReader .reset ();
6966 }
7067 if (!PixelsParallelStateNext (context, bind_data, data, gstate)) {
7168 return ;
72- } else {
73- if (!data.is_last_state ) {
74- PixelsReaderOption option;
75- option.setSkipCorruptRecords (true );
76- option.setTolerantSchemaEvolution (true );
77- option.setEnableEncodedColumnVector (true );
78- option.setFilter (gstate.filters );
79- option.setEnabledFilterPushDown (enable_filter_pushdown);
80- // includeCols comes from the caller of PixelsPageSource
81- option.setIncludeCols (data.column_names );
82- option.setRGRange (0 , data.nextReader ->getRowGroupNum ());
83- option.setQueryId (1 );
84- data.nextPixelsRecordReader = data.nextReader ->read (option);
85- }
8669 }
8770 }
8871 auto currPixelsRecordReader = std::static_pointer_cast<PixelsRecordReaderImpl>(data.currPixelsRecordReader );
8972 auto nextPixelsRecordReader = std::static_pointer_cast<PixelsRecordReaderImpl>(data.nextPixelsRecordReader );
9073
91- if (data.vectorizedRowBatch != nullptr && data.rowOffset >= data. vectorizedRowBatch ->rowCount ) {
74+ if (data.vectorizedRowBatch != nullptr && data.vectorizedRowBatch ->isEndOfFile () ) {
9275 data.vectorizedRowBatch = nullptr ;
9376 }
9477 if (data.vectorizedRowBatch == nullptr ) {
95- currPixelsRecordReader->asyncReadComplete (data.column_names .size ());
96- if (!data.is_last_state ) {
97- nextPixelsRecordReader->read ();
98- }
9978 data.vectorizedRowBatch = currPixelsRecordReader->readBatch (false );
100- data.rowOffset = 0 ;
10179 }
80+ uint64_t currentLoc = data.vectorizedRowBatch ->position ();
10281 std::shared_ptr<TypeDescription> resultSchema = data.currPixelsRecordReader ->getResultSchema ();
103- auto thisOutputChunkRows = MinValue<idx_t >(STANDARD_VECTOR_SIZE, data.vectorizedRowBatch ->rowCount - data.rowOffset );
82+ uint64_t remaining = data.vectorizedRowBatch ->rowCount - currentLoc;
83+ assert (remaining > 0 );
84+ auto thisOutputChunkRows = MinValue<idx_t >(STANDARD_VECTOR_SIZE, remaining);
10485 output.SetCardinality (thisOutputChunkRows);
10586 std::shared_ptr<pixelsFilterMask> filterMask =
10687 std::static_pointer_cast<PixelsRecordReaderImpl>(data.currPixelsRecordReader )->getFilterMask ();
@@ -113,14 +94,13 @@ void PixelsScanFunction::PixelsScanImplementation(ClientContext &context,
11394 SelectionVector sel;
11495 sel.Initialize (thisOutputChunkRows);
11596 for (idx_t i = 0 ; i < thisOutputChunkRows; i++) {
116- if (filterMask->get (i + data. rowOffset )) {
97+ if (filterMask->get (i + currentLoc )) {
11798 sel.set_index (sel_size++, i);
11899 }
119100 }
120101 output.Slice (sel, sel_size);
121102 }
122103
123- data.rowOffset += thisOutputChunkRows;
124104 if (output.size () > 0 ) {
125105 return ;
126106 } else {
@@ -193,42 +173,18 @@ unique_ptr<LocalTableFunctionState> PixelsScanFunction::PixelsScanInitLocal(
193173
194174 result->column_ids = input.column_ids ;
195175
196-
197176 auto fieldNames = bind_data.fileSchema ->getFieldNames ();
198177
199-
200178 for (column_t column_id : input.column_ids ) {
201179 if (!IsRowIdColumnId (column_id)) {
202180 result->column_names .emplace_back (fieldNames.at (column_id));
203181 }
204182 }
205- result->is_first_state = true ;
206- result->is_last_state = false ;
207- result->next_file_index = 0 ;
208- result->next_batch_index = 0 ;
209- result->curr_file_index = 0 ;
210- result->curr_batch_index = 0 ;
211- if (!PixelsParallelStateNext (context.client , bind_data, *result, gstate)) {
183+
184+ ::DirectUringRandomAccessFile::Initialize ();
185+ if (!PixelsParallelStateNext (context.client , bind_data, *result, gstate, true )) {
212186 return nullptr ;
213187 }
214- PixelsReaderOption option;
215- option.setSkipCorruptRecords (true );
216- option.setTolerantSchemaEvolution (true );
217- option.setEnableEncodedColumnVector (true );
218- option.setEnabledFilterPushDown (enable_filter_pushdown);
219- option.setFilter (gstate.filters );
220- // includeCols comes from the caller of PixelsPageSource
221- option.setIncludeCols (result->column_names );
222- option.setRGRange (0 , result->nextReader ->getRowGroupNum ());
223- option.setQueryId (1 );
224-
225- // option.setBatchSize(STANDARD_VECTOR_SIZE);
226- result->nextPixelsRecordReader = result->nextReader ->read (option);
227-
228- result->vectorizedRowBatch = nullptr ;
229- ::DirectUringRandomAccessFile::Initialize ();
230- auto nextPixelsRecordReader = std::static_pointer_cast<PixelsRecordReaderImpl>(result->nextPixelsRecordReader );
231- nextPixelsRecordReader->read ();
232188 return std::move (result);
233189}
234190
@@ -287,7 +243,6 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState & data,
287243 uint64_t thisOutputChunkRows) {
288244
289245 int row_batch_id = 0 ;
290- int row_offset = data.rowOffset ;
291246 auto column_ids = data.column_ids ;
292247 auto vectorizedRowBatch = data.vectorizedRowBatch ;
293248 for (uint64_t col_id = 0 ; col_id < column_ids.size (); col_id++) {
@@ -307,7 +262,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState & data,
307262 case TypeDescription::INT: {
308263 auto intCol = std::static_pointer_cast<LongColumnVector>(col);
309264 Vector vector (LogicalType::INTEGER,
310- (data_ptr_t )(intCol->intVector + row_offset ));
265+ (data_ptr_t )(intCol->current () ));
311266 output.data .at (col_id).Reference (vector);
312267// auto result_ptr = FlatVector::GetData<int>(output.data.at(col_id));
313268// memcpy(result_ptr, intCol->intVector + row_offset, thisOutputChunkRows * sizeof(int));
@@ -320,7 +275,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState & data,
320275 case TypeDescription::LONG: {
321276 auto longCol = std::static_pointer_cast<LongColumnVector>(col);
322277 Vector vector (LogicalType::BIGINT,
323- (data_ptr_t )(longCol->longVector + row_offset ));
278+ (data_ptr_t )(longCol->current () ));
324279 output.data .at (col_id).Reference (vector);
325280// auto result_ptr = FlatVector::GetData<long>(output.data.at(col_id));
326281// memcpy(result_ptr, longCol->longVector + row_offset, thisOutputChunkRows * sizeof(long));
@@ -336,7 +291,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState & data,
336291 case TypeDescription::DECIMAL:{
337292 auto decimalCol = std::static_pointer_cast<DecimalColumnVector>(col);
338293 Vector vector (LogicalType::DECIMAL (colSchema->getPrecision (), colSchema->getScale ()),
339- (data_ptr_t )(decimalCol->vector + row_offset ));
294+ (data_ptr_t )(decimalCol->current () ));
340295 output.data .at (col_id).Reference (vector);
341296// auto result_ptr = FlatVector::GetData<long>(output.data.at(col_id));
342297// memcpy(result_ptr, decimalCol->vector + row_offset, thisOutputChunkRows * sizeof(long));
@@ -351,7 +306,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState & data,
351306 case TypeDescription::DATE:{
352307 auto dateCol = std::static_pointer_cast<DateColumnVector>(col);
353308 Vector vector (LogicalType::DATE,
354- (data_ptr_t )(dateCol->dates + row_offset ));
309+ (data_ptr_t )(dateCol->current () ));
355310 output.data .at (col_id).Reference (vector);
356311// auto result_ptr = FlatVector::GetData<int>(output.data.at(col_id));
357312// memcpy(result_ptr, dateCol->dates + row_offset, thisOutputChunkRows * sizeof(int));
@@ -374,7 +329,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState & data,
374329 {
375330 auto binaryCol = std::static_pointer_cast<BinaryColumnVector>(col);
376331 Vector vector (LogicalType::VARCHAR,
377- (data_ptr_t )(binaryCol->vector + row_offset ));
332+ (data_ptr_t )(binaryCol->current () ));
378333 output.data .at (col_id).Reference (vector);
379334// auto result_ptr = FlatVector::GetData<duckdb::string_t>(output.data.at(col_id));
380335// memcpy(result_ptr, binaryCol->vector + row_offset, thisOutputChunkRows * sizeof(string_t));
@@ -385,13 +340,15 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState & data,
385340// default:
386341// throw InvalidArgumentException("bad column type " + std::to_string(colSchema->getCategory()));
387342 }
343+ col->increment (thisOutputChunkRows);
388344 row_batch_id++;
389345 }
390346}
391347
392348bool PixelsScanFunction::PixelsParallelStateNext (ClientContext &context, const PixelsReadBindData &bind_data,
393349 PixelsReadLocalState &scan_data,
394- PixelsReadGlobalState ¶llel_state) {
350+ PixelsReadGlobalState ¶llel_state,
351+ bool is_init_state) {
395352 unique_lock<mutex> parallel_lock (parallel_state.lock );
396353 if (parallel_state.error_opening_file ) {
397354 throw InvalidArgumentException (" PixelsScanInitLocal: file open error." );
@@ -403,8 +360,8 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P
403360 // 2. When PixelsScanImplementation invokes this function (scan_data.next_file_index > -1), if
404361 // scan_data.next_file_index >= (int) parallel_state.readers.size(), it means the current file is already
405362 // done, so the function return false.
406- if ((scan_data. is_first_state && parallel_state.file_index >= parallel_state.readers .size ()) ||
407- scan_data.is_last_state ) {
363+ if ((is_init_state && parallel_state.file_index >= parallel_state.readers .size ()) ||
364+ scan_data.next_file_index >= parallel_state. readers . size () ) {
408365 ::BufferPool::Reset ();
409366 // if async io is enabled, we need to unregister uring buffer
410367 if (ConfigFactory::Instance ().boolCheckProperty (" localfs.enable.async.io" )) {
@@ -423,18 +380,24 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P
423380 scan_data.next_file_index = parallel_state.file_index ;
424381 scan_data.next_batch_index = scan_data.next_file_index ;
425382 scan_data.curr_file_name = scan_data.next_file_name ;
426- scan_data.is_first_state = false ;
427383 parallel_state.file_index ++;
428384 parallel_lock.unlock ();
429385 // The below code uses global state but no race happens, so we don't need the lock anymore
430386
431387
432- if (scan_data.currReader . get () != nullptr ) {
388+ if (scan_data.currReader != nullptr ) {
433389 scan_data.currReader ->close ();
434390 }
435391
392+ ::BufferPool::Switch ();
393+
436394 scan_data.currReader = scan_data.nextReader ;
437395 scan_data.currPixelsRecordReader = scan_data.nextPixelsRecordReader ;
396+ // asyncReadComplete is not invoked in the first run (is_init_state = true)
397+ if (scan_data.currPixelsRecordReader != nullptr ) {
398+ auto currPixelsRecordReader = std::static_pointer_cast<PixelsRecordReaderImpl>(scan_data.currPixelsRecordReader );
399+ currPixelsRecordReader->asyncReadComplete ((int )scan_data.column_names .size ());
400+ }
438401 if (scan_data.next_file_index < parallel_state.readers .size ()) {
439402 if (parallel_state.readers [scan_data.next_file_index ]) {
440403 scan_data.next_file_name = bind_data.files .at (scan_data.next_file_index );
@@ -450,14 +413,33 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, const P
450413 ->build ();
451414 parallel_state.readers [scan_data.next_file_index ] = scan_data.nextReader ;
452415 }
416+
417+ PixelsReaderOption option = GetPixelsReaderOption (scan_data, parallel_state);
418+ scan_data.nextPixelsRecordReader = scan_data.nextReader ->read (option);
419+ auto nextPixelsRecordReader = std::static_pointer_cast<PixelsRecordReaderImpl>(scan_data.nextPixelsRecordReader );
420+ nextPixelsRecordReader->read ();
453421 } else {
454- scan_data.is_last_state = true ;
455422 scan_data.nextReader = nullptr ;
456423 scan_data.nextPixelsRecordReader = nullptr ;
457424 }
458- ::BufferPool::Switch ();
425+
426+
459427 return true ;
460428}
461429
430+ PixelsReaderOption PixelsScanFunction::GetPixelsReaderOption (PixelsReadLocalState &local_state, PixelsReadGlobalState &global_state) {
431+ PixelsReaderOption option;
432+ option.setSkipCorruptRecords (true );
433+ option.setTolerantSchemaEvolution (true );
434+ option.setEnableEncodedColumnVector (true );
435+ option.setFilter (global_state.filters );
436+ option.setEnabledFilterPushDown (enable_filter_pushdown);
437+ // includeCols comes from the caller of PixelsPageSource
438+ option.setIncludeCols (local_state.column_names );
439+ option.setRGRange (0 , local_state.nextReader ->getRowGroupNum ());
440+ option.setQueryId (1 );
441+ return option;
442+ }
443+
462444
463445}
0 commit comments