From cb45f8ab921254141c783b53d63b505ff4655aa7 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 4 Apr 2026 19:24:46 +0100 Subject: [PATCH 1/2] Add bench for first_last --- datafusion/functions-aggregate/Cargo.toml | 4 + .../functions-aggregate/benches/first_last.rs | 194 ++++++++++++++++++ 2 files changed, 198 insertions(+) create mode 100644 datafusion/functions-aggregate/benches/first_last.rs diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index 07ee89af84a0e..8aa7976ab1abd 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -79,3 +79,7 @@ name = "min_max_bytes" [[bench]] name = "approx_distinct" harness = false + +[[bench]] +name = "first_last" +harness = false diff --git a/datafusion/functions-aggregate/benches/first_last.rs b/datafusion/functions-aggregate/benches/first_last.rs new file mode 100644 index 0000000000000..8c888329f814b --- /dev/null +++ b/datafusion/functions-aggregate/benches/first_last.rs @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint::black_box; +use std::sync::Arc; + +use arrow::array::{ArrayRef, BooleanArray}; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, Int64Type, Schema}; +use arrow::util::bench_util::{create_boolean_array, create_primitive_array}; + +use datafusion_expr::{ + AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, +}; +use datafusion_functions_aggregate::first_last::{FirstValue, LastValue}; +use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr::expressions::col; + +use criterion::{Criterion, criterion_group, criterion_main}; + +fn prepare_groups_accumulator(is_first: bool) -> Box { + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Int64, true), + Field::new("ord", DataType::Int64, true), + ])); + + let order_expr = col("ord", &schema).unwrap(); + let sort_expr = PhysicalSortExpr { + expr: order_expr, + options: SortOptions::default(), + }; + + let value_field: Arc = Field::new("value", DataType::Int64, true).into(); + let accumulator_args = AccumulatorArgs { + return_field: Arc::clone(&value_field), + schema: &schema, + expr_fields: &[value_field], + ignore_nulls: false, + order_bys: std::slice::from_ref(&sort_expr), + is_reversed: false, + name: if is_first { + "FIRST_VALUE(value ORDER BY ord)" + } else { + "LAST_VALUE(value ORDER BY ord)" + }, + is_distinct: false, + exprs: &[col("value", &schema).unwrap()], + }; + + if is_first { + FirstValue::new() + .create_groups_accumulator(accumulator_args) + .unwrap() + } else { + LastValue::new() + .create_groups_accumulator(accumulator_args) + .unwrap() + } +} + +#[expect(clippy::needless_pass_by_value)] +fn convert_to_state_bench( + c: &mut Criterion, + is_first: bool, + name: &str, + values: ArrayRef, + opt_filter: Option<&BooleanArray>, +) { + c.bench_function(name, |b| { + b.iter(|| { + let accumulator = prepare_groups_accumulator(is_first); + black_box( + accumulator + .convert_to_state(std::slice::from_ref(&values), opt_filter) + .unwrap(), + ) + }) + }); +} + +#[expect(clippy::needless_pass_by_value)] +#[expect(clippy::too_many_arguments)] +fn evaluate_bench( + c: &mut Criterion, + is_first: bool, + emit_to: EmitTo, + name: &str, + values: ArrayRef, + ord: ArrayRef, + opt_filter: Option<&BooleanArray>, + num_groups: usize, +) { + let n = values.len(); + let group_indices: Vec = (0..n).map(|i| i % num_groups).collect(); + + c.bench_function(name, |b| { + b.iter_batched( + || { + // setup, not timed + let mut accumulator = prepare_groups_accumulator(is_first); + accumulator + .update_batch( + &[Arc::clone(&values), Arc::clone(&ord)], + &group_indices, + opt_filter, + num_groups, + ) + .unwrap(); + accumulator + }, + |mut accumulator| black_box(accumulator.evaluate(emit_to).unwrap()), + criterion::BatchSize::SmallInput, + ) + }); +} + +fn first_last_benchmark(c: &mut Criterion) { + const N: usize = 65536; + const NUM_GROUPS: usize = 1024; + + assert_eq!(N % NUM_GROUPS, 0); + + for is_first in [true, false] { + for pct in [0, 90] { + let fn_name = if is_first { + "first_value" + } else { + "last_value" + }; + + let null_density = (pct as f32) / 100.0; + let values = Arc::new(create_primitive_array::(N, null_density)) + as ArrayRef; + let ord = Arc::new(create_primitive_array::(N, null_density)) + as ArrayRef; + + for with_filter in [false, true] { + let filter = create_boolean_array(N, 0.0, 0.5); + let opt_filter = if with_filter { Some(&filter) } else { None }; + + convert_to_state_bench( + c, + is_first, + &format!( + "{fn_name} convert_to_state nulls={pct}%, filter={with_filter}" + ), + values.clone(), + opt_filter, + ); + evaluate_bench( + c, + is_first, + EmitTo::First(2), + &format!( + "{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, first(2)" + ), + values.clone(), + ord.clone(), + opt_filter, + NUM_GROUPS, + ); + evaluate_bench( + c, + is_first, + EmitTo::All, + &format!( + "{fn_name} evaluate_bench nulls={pct}%, filter={with_filter}, all" + ), + values.clone(), + ord.clone(), + opt_filter, + NUM_GROUPS, + ); + } + } + } +} + +criterion_group!(benches, first_last_benchmark); +criterion_main!(benches); From c0ca6b6093b499db3e321289bda0edfc940df3e3 Mon Sep 17 00:00:00 2001 From: theirix Date: Mon, 6 Apr 2026 11:43:58 +0100 Subject: [PATCH 2/2] Add bench for simple Accumulator --- .../functions-aggregate/benches/first_last.rs | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/benches/first_last.rs b/datafusion/functions-aggregate/benches/first_last.rs index 8c888329f814b..6070470138aa6 100644 --- a/datafusion/functions-aggregate/benches/first_last.rs +++ b/datafusion/functions-aggregate/benches/first_last.rs @@ -24,7 +24,7 @@ use arrow::datatypes::{DataType, Field, Int64Type, Schema}; use arrow::util::bench_util::{create_boolean_array, create_primitive_array}; use datafusion_expr::{ - AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, + Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, function::AccumulatorArgs, }; use datafusion_functions_aggregate::first_last::{FirstValue, LastValue}; use datafusion_physical_expr::PhysicalSortExpr; @@ -72,6 +72,42 @@ fn prepare_groups_accumulator(is_first: bool) -> Box { } } +fn prepare_accumulator(is_first: bool) -> Box { + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Int64, true), + Field::new("ord", DataType::Int64, true), + ])); + + let order_expr = col("ord", &schema).unwrap(); + let sort_expr = PhysicalSortExpr { + expr: order_expr, + options: SortOptions::default(), + }; + + let value_field: Arc = Field::new("value", DataType::Int64, true).into(); + let accumulator_args = AccumulatorArgs { + return_field: Arc::clone(&value_field), + schema: &schema, + expr_fields: &[value_field], + ignore_nulls: false, + order_bys: std::slice::from_ref(&sort_expr), + is_reversed: false, + name: if is_first { + "FIRST_VALUE(value ORDER BY ord)" + } else { + "LAST_VALUE(value ORDER BY ord)" + }, + is_distinct: false, + exprs: &[col("value", &schema).unwrap()], + }; + + if is_first { + FirstValue::new().accumulator(accumulator_args).unwrap() + } else { + LastValue::new().accumulator(accumulator_args).unwrap() + } +} + #[expect(clippy::needless_pass_by_value)] fn convert_to_state_bench( c: &mut Criterion, @@ -92,6 +128,30 @@ fn convert_to_state_bench( }); } +#[expect(clippy::needless_pass_by_value)] +fn evaluate_accumulator_bench( + c: &mut Criterion, + is_first: bool, + name: &str, + values: ArrayRef, + ord: ArrayRef, +) { + c.bench_function(name, |b| { + b.iter_batched( + || { + // setup, not timed + let mut accumulator = prepare_accumulator(is_first); + accumulator + .update_batch(&[Arc::clone(&values), Arc::clone(&ord)]) + .unwrap(); + accumulator + }, + |mut accumulator| black_box(accumulator.evaluate().unwrap()), + criterion::BatchSize::SmallInput, + ) + }); +} + #[expect(clippy::needless_pass_by_value)] #[expect(clippy::too_many_arguments)] fn evaluate_bench( @@ -148,6 +208,14 @@ fn first_last_benchmark(c: &mut Criterion) { let ord = Arc::new(create_primitive_array::(N, null_density)) as ArrayRef; + evaluate_accumulator_bench( + c, + is_first, + &format!("{fn_name} evaluate_accumulator_bench nulls={pct}%"), + values.clone(), + ord.clone(), + ); + for with_filter in [false, true] { let filter = create_boolean_array(N, 0.0, 0.5); let opt_filter = if with_filter { Some(&filter) } else { None };