Skip to content

Commit 6ba8c7c

Browse files
committed
refactor deduplicate fill paths
1 parent cf8d870 commit 6ba8c7c

1 file changed

Lines changed: 54 additions & 71 deletions

File tree

datafusion/functions-nested/src/resize.rs

Lines changed: 54 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -246,82 +246,73 @@ fn general_list_resize<O: OffsetSizeTrait + TryInto<i64>>(
246246
}
247247
};
248248

249-
// Fast path: at least one row needs to grow and all rows share
250-
// the same fill value.
251249
if use_bulk_fill {
250+
// Fast path: materialize one reusable fill buffer for all grown rows.
252251
let fill_scalar = match &default_element {
253252
None => ScalarValue::try_from(&data_type)?,
254253
Some(fill_array) if fill_array.logical_null_count() == fill_array.len() => {
255254
ScalarValue::try_from(&data_type)?
256255
}
257256
Some(fill_array) => ScalarValue::try_from_array(fill_array.as_ref(), 0)?,
258257
};
259-
let default_element = fill_scalar.to_array_of_size(max_extra)?;
260-
let default_value_data = default_element.to_data();
261-
262-
let capacity = Capacities::Array(output_values_len);
263-
let mut offsets = vec![O::usize_as(0)];
264-
let mut mutable = MutableArrayData::with_capacities(
265-
vec![&original_data, &default_value_data],
266-
false,
267-
capacity,
268-
);
269-
270-
let mut null_builder = NullBufferBuilder::new(array.len());
271-
272-
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
273-
if array.is_null(row_index) {
274-
null_builder.append_null();
275-
offsets.push(offsets[row_index]);
276-
continue;
258+
let fill_values = fill_scalar.to_array_of_size(max_extra)?;
259+
let default_value_data = fill_values.to_data();
260+
build_resized_list(
261+
array,
262+
count_array,
263+
field,
264+
&original_data,
265+
&default_value_data,
266+
output_values_len,
267+
|mutable, _, extra_count| mutable.extend(1, 0, extra_count),
268+
)
269+
} else {
270+
// Slow path: rows may need different fill values, so append from the
271+
// corresponding slot in the input fill array for each grown element.
272+
let fill_values = match default_element {
273+
Some(fill_values) => fill_values,
274+
None => {
275+
let null_scalar = ScalarValue::try_from(&data_type)?;
276+
null_scalar.to_array_of_size(original_data.len())?
277277
}
278-
null_builder.append_non_null();
279-
280-
let count = count_array.value(row_index).to_usize().ok_or_else(|| {
281-
internal_datafusion_err!("array_resize: failed to convert size to usize")
282-
})?;
283-
let count = O::usize_as(count);
284-
let start = offset_window[0];
285-
if start + count > offset_window[1] {
286-
let extra_count = (start + count - offset_window[1]).to_usize().unwrap();
287-
let end = offset_window[1];
288-
mutable.extend(0, start.to_usize().unwrap(), end.to_usize().unwrap());
289-
mutable.extend(1, 0, extra_count);
290-
} else {
291-
let end = start + count;
292-
mutable.extend(0, start.to_usize().unwrap(), end.to_usize().unwrap());
293-
};
294-
offsets.push(offsets[row_index] + count);
295-
}
296-
297-
let data = mutable.freeze();
298-
299-
return Ok(Arc::new(GenericListArray::<O>::try_new(
300-
Arc::clone(field),
301-
OffsetBuffer::<O>::new(offsets.into()),
302-
arrow::array::make_array(data),
303-
null_builder.finish(),
304-
)?));
278+
};
279+
let default_value_data = fill_values.to_data();
280+
build_resized_list(
281+
array,
282+
count_array,
283+
field,
284+
&original_data,
285+
&default_value_data,
286+
output_values_len,
287+
|mutable, row_index, extra_count| {
288+
for _ in 0..extra_count {
289+
mutable.extend(1, row_index, row_index + 1);
290+
}
291+
},
292+
)
305293
}
294+
}
306295

307-
// Slow path: each row may have a different fill value.
308-
let default_element = if let Some(default_element) = default_element {
309-
default_element
310-
} else {
311-
let null_scalar = ScalarValue::try_from(&data_type)?;
312-
null_scalar.to_array_of_size(original_data.len())?
313-
};
314-
let default_value_data = default_element.to_data();
315-
316-
// create a mutable array to store the original data
296+
fn build_resized_list<O, F>(
297+
array: &GenericListArray<O>,
298+
count_array: &Int64Array,
299+
field: &FieldRef,
300+
original_data: &arrow::array::ArrayData,
301+
default_value_data: &arrow::array::ArrayData,
302+
output_values_len: usize,
303+
mut append_fill_values: F,
304+
) -> Result<ArrayRef>
305+
where
306+
O: OffsetSizeTrait + TryInto<i64>,
307+
F: FnMut(&mut MutableArrayData, usize, usize),
308+
{
317309
let capacity = Capacities::Array(output_values_len);
318310
let mut offsets = vec![O::usize_as(0)];
319311
let mut mutable = MutableArrayData::with_capacities(
320-
vec![&original_data, &default_value_data],
312+
vec![original_data, default_value_data],
321313
false,
322314
capacity,
323315
);
324-
325316
let mut null_builder = NullBufferBuilder::new(array.len());
326317

327318
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
@@ -338,21 +329,13 @@ fn general_list_resize<O: OffsetSizeTrait + TryInto<i64>>(
338329
let count = O::usize_as(count);
339330
let start = offset_window[0];
340331
if start + count > offset_window[1] {
341-
let extra_count =
342-
(start + count - offset_window[1]).try_into().map_err(|_| {
343-
internal_datafusion_err!(
344-
"array_resize: failed to convert size to i64"
345-
)
346-
})?;
332+
let extra_count = (start + count - offset_window[1]).to_usize().unwrap();
347333
let end = offset_window[1];
348-
mutable.extend(0, (start).to_usize().unwrap(), (end).to_usize().unwrap());
349-
// append default element
350-
for _ in 0..extra_count {
351-
mutable.extend(1, row_index, row_index + 1);
352-
}
334+
mutable.extend(0, start.to_usize().unwrap(), end.to_usize().unwrap());
335+
append_fill_values(&mut mutable, row_index, extra_count);
353336
} else {
354337
let end = start + count;
355-
mutable.extend(0, (start).to_usize().unwrap(), (end).to_usize().unwrap());
338+
mutable.extend(0, start.to_usize().unwrap(), end.to_usize().unwrap());
356339
};
357340
offsets.push(offsets[row_index] + count);
358341
}

0 commit comments

Comments
 (0)