Skip to content

Commit 80d7558

Browse files
committed
optimize array_resize
1 parent 3a23bb2 commit 80d7558

3 files changed

Lines changed: 365 additions & 4 deletions

File tree

datafusion/functions-nested/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,7 @@ name = "array_set_ops"
101101
[[bench]]
102102
harness = false
103103
name = "array_position"
104+
105+
[[bench]]
106+
harness = false
107+
name = "array_resize"
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{ArrayRef, Int64Array, ListArray};
19+
use arrow::buffer::OffsetBuffer;
20+
use arrow::datatypes::{DataType, Field};
21+
use criterion::{
22+
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup,
23+
Criterion,
24+
};
25+
use datafusion_common::config::ConfigOptions;
26+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
27+
use datafusion_functions_nested::resize::ArrayResize;
28+
use std::hint::black_box;
29+
use std::sync::Arc;
30+
31+
const NUM_ROWS: usize = 1_000;
32+
33+
fn criterion_benchmark(c: &mut Criterion) {
34+
let mut group = c.benchmark_group("array_resize_i64");
35+
let list_field: Arc<Field> =
36+
Field::new_list_field(DataType::Int64, true).into();
37+
let list_data_type = DataType::List(Arc::clone(&list_field));
38+
let arg_fields = vec![
39+
Field::new("array", list_data_type.clone(), true).into(),
40+
Field::new("size", DataType::Int64, false).into(),
41+
Field::new("value", DataType::Int64, true).into(),
42+
];
43+
let return_field: Arc<Field> =
44+
Field::new("result", list_data_type, true).into();
45+
let config_options = Arc::new(ConfigOptions::default());
46+
let two_arg_fields = arg_fields[..2].to_vec();
47+
48+
bench_case(
49+
&mut group,
50+
"grow_uniform_fill_10_to_500",
51+
vec![
52+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 10)),
53+
ColumnarValue::Array(repeated_int64_array(500)),
54+
ColumnarValue::Array(repeated_int64_array(7)),
55+
],
56+
arg_fields.clone(),
57+
return_field.clone(),
58+
config_options.clone(),
59+
);
60+
61+
bench_case(
62+
&mut group,
63+
"shrink_uniform_fill_500_to_10",
64+
vec![
65+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 500)),
66+
ColumnarValue::Array(repeated_int64_array(10)),
67+
ColumnarValue::Array(repeated_int64_array(7)),
68+
],
69+
arg_fields.clone(),
70+
return_field.clone(),
71+
config_options.clone(),
72+
);
73+
74+
bench_case(
75+
&mut group,
76+
"grow_default_null_fill_10_to_500",
77+
vec![
78+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 10)),
79+
ColumnarValue::Array(repeated_int64_array(500)),
80+
],
81+
two_arg_fields,
82+
return_field.clone(),
83+
config_options.clone(),
84+
);
85+
86+
bench_case(
87+
&mut group,
88+
"grow_variable_fill_10_to_500",
89+
vec![
90+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 10)),
91+
ColumnarValue::Array(repeated_int64_array(500)),
92+
ColumnarValue::Array(distinct_fill_array()),
93+
],
94+
arg_fields.clone(),
95+
return_field.clone(),
96+
config_options.clone(),
97+
);
98+
99+
bench_case(
100+
&mut group,
101+
"mixed_grow_shrink_1000x_100",
102+
vec![
103+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 100)),
104+
ColumnarValue::Array(mixed_size_array()),
105+
],
106+
arg_fields[..2].to_vec(),
107+
return_field,
108+
config_options,
109+
);
110+
111+
group.finish();
112+
}
113+
114+
fn bench_case(
115+
group: &mut BenchmarkGroup<'_, WallTime>,
116+
name: &str,
117+
args: Vec<ColumnarValue>,
118+
arg_fields: Vec<Arc<Field>>,
119+
return_field: Arc<Field>,
120+
config_options: Arc<ConfigOptions>,
121+
) {
122+
let udf = ArrayResize::new();
123+
group.bench_function(name, |b| {
124+
b.iter(|| {
125+
black_box(
126+
udf.invoke_with_args(ScalarFunctionArgs {
127+
args: args.clone(),
128+
arg_fields: arg_fields.clone(),
129+
number_rows: NUM_ROWS,
130+
return_field: return_field.clone(),
131+
config_options: config_options.clone(),
132+
})
133+
.unwrap(),
134+
)
135+
})
136+
});
137+
}
138+
139+
fn create_int64_list_array(num_rows: usize, list_len: usize) -> ArrayRef {
140+
let values = (0..(num_rows * list_len))
141+
.map(|v| Some(v as i64))
142+
.collect::<Int64Array>();
143+
let offsets = (0..=num_rows)
144+
.map(|i| (i * list_len) as i32)
145+
.collect::<Vec<i32>>();
146+
147+
Arc::new(
148+
ListArray::try_new(
149+
Arc::new(Field::new_list_field(DataType::Int64, true)),
150+
OffsetBuffer::new(offsets.into()),
151+
Arc::new(values),
152+
None,
153+
)
154+
.unwrap(),
155+
)
156+
}
157+
158+
fn repeated_int64_array(value: i64) -> ArrayRef {
159+
Arc::new(Int64Array::from_value(value, NUM_ROWS))
160+
}
161+
162+
fn distinct_fill_array() -> ArrayRef {
163+
Arc::new(Int64Array::from_iter((0..NUM_ROWS).map(|i| Some(i as i64))))
164+
}
165+
166+
fn mixed_size_array() -> ArrayRef {
167+
Arc::new(Int64Array::from_iter((0..NUM_ROWS).map(|i| {
168+
Some(if i % 2 == 0 { 200_i64 } else { 10_i64 })
169+
})))
170+
}
171+
172+
criterion_group!(benches, criterion_benchmark);
173+
criterion_main!(benches);

datafusion/functions-nested/src/resize.rs

Lines changed: 188 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
2020
use crate::utils::make_scalar_function;
2121
use arrow::array::{
22-
Array, ArrayRef, Capacities, GenericListArray, Int64Array, MutableArrayData,
23-
NullBufferBuilder, OffsetSizeTrait, new_null_array,
22+
new_null_array, Array, ArrayRef, Capacities, GenericListArray, Int64Array,
23+
MutableArrayData, NullBufferBuilder, OffsetSizeTrait,
2424
};
2525
use arrow::buffer::OffsetBuffer;
2626
use arrow::datatypes::DataType;
@@ -31,7 +31,7 @@ use arrow::datatypes::{
3131
};
3232
use datafusion_common::cast::{as_int64_array, as_large_list_array, as_list_array};
3333
use datafusion_common::utils::ListCoercion;
34-
use datafusion_common::{Result, ScalarValue, exec_err, internal_datafusion_err};
34+
use datafusion_common::{exec_err, internal_datafusion_err, Result, ScalarValue};
3535
use datafusion_expr::{
3636
ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, Documentation,
3737
ScalarUDFImpl, Signature, TypeSignature, Volatility,
@@ -206,7 +206,120 @@ fn general_list_resize<O: OffsetSizeTrait + TryInto<i64>>(
206206
let values = array.values();
207207
let original_data = values.to_data();
208208

209-
// create default element array
209+
// Track the largest per-row growth so the uniform-fill fast path can
210+
// materialize one reusable fill buffer of the required size.
211+
let mut max_extra: usize = 0;
212+
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
213+
if array.is_null(row_index) {
214+
continue;
215+
}
216+
let target_count =
217+
count_array.value(row_index).to_usize().ok_or_else(|| {
218+
internal_datafusion_err!(
219+
"array_resize: failed to convert size to usize"
220+
)
221+
})?;
222+
let current_len = (offset_window[1] - offset_window[0]).to_usize().unwrap();
223+
if target_count > current_len {
224+
let extra = target_count - current_len;
225+
if extra > max_extra {
226+
max_extra = extra;
227+
}
228+
}
229+
}
230+
231+
// The fast path is valid when at least one row grows and every row would
232+
// use the same fill value.
233+
let is_uniform_fill = max_extra > 0
234+
&& match &default_element {
235+
None => true,
236+
Some(fill_array) => {
237+
let len = fill_array.len();
238+
let null_count = fill_array.logical_null_count();
239+
240+
len <= 1
241+
|| null_count == len
242+
|| (null_count == 0 && {
243+
let first = fill_array.slice(0, 1);
244+
(1..len)
245+
.all(|i| fill_array.slice(i, 1).as_ref() == first.as_ref())
246+
})
247+
}
248+
};
249+
250+
// Fast path: at least one row needs to grow and all rows share
251+
// the same fill value.
252+
if is_uniform_fill {
253+
let fill_scalar = match &default_element {
254+
None => ScalarValue::try_from(&data_type)?,
255+
Some(fill_array) if fill_array.logical_null_count() == fill_array.len() => {
256+
ScalarValue::try_from(&data_type)?
257+
}
258+
Some(fill_array) => {
259+
ScalarValue::try_from_array(fill_array.as_ref(), 0)?
260+
}
261+
};
262+
let default_element = fill_scalar.to_array_of_size(max_extra)?;
263+
let default_value_data = default_element.to_data();
264+
265+
let capacity =
266+
Capacities::Array(original_data.len() + default_value_data.len());
267+
let mut offsets = vec![O::usize_as(0)];
268+
let mut mutable = MutableArrayData::with_capacities(
269+
vec![&original_data, &default_value_data],
270+
false,
271+
capacity,
272+
);
273+
274+
let mut null_builder = NullBufferBuilder::new(array.len());
275+
276+
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
277+
if array.is_null(row_index) {
278+
null_builder.append_null();
279+
offsets.push(offsets[row_index]);
280+
continue;
281+
}
282+
null_builder.append_non_null();
283+
284+
let count = count_array.value(row_index).to_usize().ok_or_else(|| {
285+
internal_datafusion_err!(
286+
"array_resize: failed to convert size to usize"
287+
)
288+
})?;
289+
let count = O::usize_as(count);
290+
let start = offset_window[0];
291+
if start + count > offset_window[1] {
292+
let extra_count =
293+
(start + count - offset_window[1]).to_usize().unwrap();
294+
let end = offset_window[1];
295+
mutable.extend(
296+
0,
297+
(start).to_usize().unwrap(),
298+
(end).to_usize().unwrap(),
299+
);
300+
mutable.extend(1, 0, extra_count);
301+
} else {
302+
let end = start + count;
303+
mutable.extend(
304+
0,
305+
(start).to_usize().unwrap(),
306+
(end).to_usize().unwrap(),
307+
);
308+
};
309+
offsets.push(offsets[row_index] + count);
310+
}
311+
312+
let data = mutable.freeze();
313+
314+
return Ok(Arc::new(GenericListArray::<O>::try_new(
315+
Arc::clone(field),
316+
OffsetBuffer::<O>::new(offsets.into()),
317+
arrow::array::make_array(data),
318+
null_builder.finish(),
319+
)?));
320+
}
321+
322+
// Slow path: each row may have a different fill value.
210323
let default_element = if let Some(default_element) = default_element {
211324
default_element
212325
} else {
@@ -268,3 +381,74 @@ fn general_list_resize<O: OffsetSizeTrait + TryInto<i64>>(
268381
null_builder.finish(),
269382
)?))
270383
}
384+
385+
#[cfg(test)]
386+
mod tests {
387+
use super::*;
388+
use arrow::array::{AsArray, ListArray};
389+
use arrow::datatypes::Int64Type;
390+
391+
fn list_values(array: &ListArray) -> Vec<Option<Vec<Option<i64>>>> {
392+
array
393+
.iter()
394+
.map(|row| {
395+
row.map(|values| {
396+
values
397+
.as_primitive::<Int64Type>()
398+
.iter()
399+
.collect::<Vec<Option<i64>>>()
400+
})
401+
})
402+
.collect()
403+
}
404+
405+
#[test]
406+
fn test_array_resize_preserves_row_fill_values() -> Result<()> {
407+
let list = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
408+
Some(vec![Some(1)]),
409+
Some(vec![Some(2)]),
410+
]);
411+
let new_len = Int64Array::from(vec![3, 2]);
412+
let fill = Int64Array::from(vec![9, 8]);
413+
414+
let args: Vec<ArrayRef> = vec![
415+
Arc::new(list),
416+
Arc::new(new_len),
417+
Arc::new(fill),
418+
];
419+
let result = array_resize_inner(&args)?;
420+
let result = result.as_list::<i32>();
421+
422+
let expected = vec![
423+
Some(vec![Some(1), Some(9), Some(9)]),
424+
Some(vec![Some(2), Some(8)]),
425+
];
426+
assert_eq!(expected, list_values(result));
427+
Ok(())
428+
}
429+
430+
#[test]
431+
fn test_array_resize_uniform_fill_fast_path() -> Result<()> {
432+
let list = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
433+
Some(vec![Some(1)]),
434+
Some(vec![Some(2)]),
435+
]);
436+
let new_len = Int64Array::from(vec![3, 2]);
437+
let fill = Int64Array::from(vec![9, 9]);
438+
439+
let args: Vec<ArrayRef> = vec![
440+
Arc::new(list),
441+
Arc::new(new_len),
442+
Arc::new(fill),
443+
];
444+
let result = array_resize_inner(&args)?;
445+
let result = result.as_list::<i32>();
446+
447+
let expected = vec![
448+
Some(vec![Some(1), Some(9), Some(9)]),
449+
Some(vec![Some(2), Some(9)]),
450+
];
451+
assert_eq!(expected, list_values(result));
452+
Ok(())
453+
}
454+
}

0 commit comments

Comments
 (0)