This is an automated email from the ASF dual-hosted git repository.
zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8c0068320ba [enhance](agg) Support max/min agg functions for
type_array (#58490)
8c0068320ba is described below
commit 8c0068320ba0100a61c5951c83ac0b34d8f7319d
Author: admiring_xm <[email protected]>
AuthorDate: Wed Dec 3 12:23:41 2025 +0800
[enhance](agg) Support max/min agg functions for type_array (#58490)
Support max/min agg functions for type_array
Issue Number: https://github.com/apache/doris/issues/58417
---
.../aggregate_function_min_max.cpp | 57 ++---
.../aggregate_function_min_max.h | 238 +++++++++++++--------
be/src/vec/common/string_buffer.hpp | 10 +
.../trees/expressions/functions/agg/Max.java | 7 +-
.../trees/expressions/functions/agg/Min.java | 7 +-
.../test_aggregate_all_functions2.out | 13 ++
.../test_aggregate_all_functions2.groovy | 59 +++++
7 files changed, 275 insertions(+), 116 deletions(-)
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.cpp
b/be/src/vec/aggregate_functions/aggregate_function_min_max.cpp
index ccdbd9dfe45..4faf6643d78 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.cpp
@@ -129,35 +129,37 @@ AggregateFunctionPtr
create_aggregate_function_single_value(const String& name,
return creator_without_type::create_unary_arguments<
AggregateFunctionsSingleValue<Data<SingleValueDataDecimal<TYPE_DECIMAL256>>>>(
argument_types, result_is_nullable, attr);
- default:
- return nullptr;
- }
-}
-
-// any_value
-template <template <typename> class Data>
-AggregateFunctionPtr create_aggregate_function_single_value_any_value_function(
- const String& name, const DataTypes& argument_types, const bool
result_is_nullable,
- const AggregateFunctionAttr& attr) {
- AggregateFunctionPtr res = create_aggregate_function_single_value<Data>(
- name, argument_types, result_is_nullable, attr);
- if (res) {
- return res;
- }
- const DataTypePtr& argument_type = remove_nullable(argument_types[0]);
- if (argument_type->get_primitive_type() == PrimitiveType::TYPE_ARRAY ||
- argument_type->get_primitive_type() == PrimitiveType::TYPE_MAP ||
- argument_type->get_primitive_type() == PrimitiveType::TYPE_STRUCT ||
- argument_type->get_primitive_type() == PrimitiveType::TYPE_AGG_STATE ||
- argument_type->get_primitive_type() == PrimitiveType::TYPE_BITMAP ||
- argument_type->get_primitive_type() == PrimitiveType::TYPE_HLL ||
- argument_type->get_primitive_type() ==
PrimitiveType::TYPE_QUANTILE_STATE) {
+ case PrimitiveType::TYPE_ARRAY:
+ return creator_without_type::create_unary_arguments<
+
AggregateFunctionsSingleValue<Data<SingleValueDataComplexType>>>(
+ argument_types, result_is_nullable, attr);
+ case PrimitiveType::TYPE_MAP:
+ return creator_without_type::create_unary_arguments<
+
AggregateFunctionsSingleValue<Data<SingleValueDataComplexType>>>(
+ argument_types, result_is_nullable, attr);
+ case PrimitiveType::TYPE_STRUCT:
return creator_without_type::create_unary_arguments<
- AggregateFunctionsSingleValue<SingleValueDataComplexType>>(
+
AggregateFunctionsSingleValue<Data<SingleValueDataComplexType>>>(
argument_types, result_is_nullable, attr);
+ case PrimitiveType::TYPE_AGG_STATE:
+ return creator_without_type::create_unary_arguments<
+
AggregateFunctionsSingleValue<Data<SingleValueDataComplexType>>>(
+ argument_types, result_is_nullable, attr);
+ case PrimitiveType::TYPE_BITMAP:
+ return creator_without_type::create_unary_arguments<
+
AggregateFunctionsSingleValue<Data<SingleValueDataComplexType>>>(
+ argument_types, result_is_nullable, attr);
+ case PrimitiveType::TYPE_HLL:
+ return creator_without_type::create_unary_arguments<
+
AggregateFunctionsSingleValue<Data<SingleValueDataComplexType>>>(
+ argument_types, result_is_nullable, attr);
+ case PrimitiveType::TYPE_QUANTILE_STATE:
+ return creator_without_type::create_unary_arguments<
+
AggregateFunctionsSingleValue<Data<SingleValueDataComplexType>>>(
+ argument_types, result_is_nullable, attr);
+ default:
+ return nullptr;
}
-
- return nullptr;
}
void register_aggregate_function_minmax(AggregateFunctionSimpleFactory&
factory) {
@@ -166,8 +168,7 @@ void
register_aggregate_function_minmax(AggregateFunctionSimpleFactory& factory)
factory.register_function_both(
"min",
create_aggregate_function_single_value<AggregateFunctionMinData>);
factory.register_function_both(
- "any",
-
create_aggregate_function_single_value_any_value_function<AggregateFunctionAnyData>);
+ "any",
create_aggregate_function_single_value<AggregateFunctionAnyData>);
factory.register_alias("any", "any_value");
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index cd100f366b7..102ee88d3d7 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -36,6 +36,7 @@
#include "runtime/type_limit.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
#include "vec/columns/column_fixed_length_object.h"
#include "vec/columns/column_string.h"
#include "vec/common/assert_cast.h"
@@ -522,12 +523,152 @@ public:
}
};
+struct SingleValueDataComplexType {
+private:
+ using Self = SingleValueDataComplexType;
+
+ DataTypePtr column_type;
+ bool has_value = false;
+ MutableColumnPtr column_data; // a column ptr only save a single value
+ int be_exec_version = -1;
+
+public:
+ SingleValueDataComplexType() = default;
+ SingleValueDataComplexType(const DataTypes& argument_types, int
be_version) {
+ column_type = argument_types[0];
+ column_data = column_type->create_column();
+ be_exec_version = be_version;
+ }
+
+ bool has() const { return has_value; }
+
+ constexpr static bool IsFixedLength = false;
+
+ void insert_result_into(IColumn& to) const {
+ if (has()) {
+ to.insert_from(*column_data, 0);
+ } else {
+ to.insert_default();
+ }
+ }
+
+ void reset() {
+ has_value = false;
+ column_data->clear();
+ }
+
+ void write(BufferWritable& buf) const {
+ buf.write_binary(has_value);
+ if (!has()) {
+ return;
+ }
+ auto size_bytes =
+ column_type->get_uncompressed_serialized_bytes(*column_data,
be_exec_version);
+ buf.write_binary(size_bytes);
+ buf.resize(size_bytes);
+ auto* p = column_type->serialize(*column_data, buf.data(),
be_exec_version);
+ DCHECK_EQ(p, buf.data() + size_bytes);
+ buf.add_offset(size_bytes);
+ }
+
+ void read(BufferReadable& buf, Arena& arena) {
+ buf.read_binary(has_value);
+ if (!has()) {
+ return;
+ }
+ int64_t size;
+ buf.read_binary(size);
+ const auto* p = column_type->deserialize(buf.data(), &column_data,
be_exec_version);
+ DCHECK_EQ(p, buf.data() + size);
+ buf.add_offset(size);
+ }
+
+ void change(const IColumn& column, size_t row_num, Arena&) {
+ has_value = true;
+ column_data->clear();
+ column_data->insert_from(column, row_num);
+ }
+
+ /// Assuming to.has()
+ void change(const Self& to, Arena&) {
+ has_value = true;
+ column_data->clear();
+ column_data->insert_from(*to.column_data, 0);
+ }
+
+ bool change_if_less(const IColumn& column, size_t row_num, Arena& arena) {
+ if (!has() || column_data->compare_at(0, row_num, column, -1) == 1) {
+ change(column, row_num, arena);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ bool change_if_less(const Self& to, Arena& arena) {
+ if (to.has() && (!has() || column_data->compare_at(0, 0,
*to.column_data, -1) == 1)) {
+ change(to, arena);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ bool change_if_greater(const IColumn& column, size_t row_num, Arena&
arena) {
+ if (!has() || column_data->compare_at(0, row_num, column, -1) == -1) {
+ change(column, row_num, arena);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ bool change_if_greater(const Self& to, Arena& arena) {
+ if (to.has() && (!has() || column_data->compare_at(0, 0,
*to.column_data, -1) == -1)) {
+ change(to, arena);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ bool check_if_equal(const IColumn& column, size_t row_num) const {
+ if (!has()) {
+ return false;
+ }
+ auto type = column_type->get_primitive_type();
+ if (type == TYPE_BITMAP || type == TYPE_HLL || type ==
TYPE_QUANTILE_STATE ||
+ type == TYPE_AGG_STATE) {
+ return false;
+ } else {
+ return !column_data->compare_at(0, row_num, column, -1);
+ }
+ }
+
+ void change_first_time(const IColumn& column, size_t row_num, Arena&
arena) {
+ if (UNLIKELY(!has())) {
+ change(column, row_num, arena);
+ }
+ }
+
+ void change_first_time(const Self& to, Arena& arena) {
+ if (UNLIKELY(!has() && to.has())) {
+ change(to, arena);
+ }
+ }
+};
+
template <typename Data>
struct AggregateFunctionMaxData : public Data {
using Self = AggregateFunctionMaxData;
using Data::IsFixedLength;
constexpr static bool IS_ANY = false;
+ AggregateFunctionMaxData(const DataTypes& argument_types, int be_version)
+ : Data(argument_types, be_version) {
+ this->reset();
+ }
+
AggregateFunctionMaxData() { reset(); }
void change_if_better(const IColumn& column, size_t row_num, Arena& arena)
{
@@ -556,6 +697,11 @@ struct AggregateFunctionMinData : Data {
using Data::IsFixedLength;
constexpr static bool IS_ANY = false;
+ AggregateFunctionMinData(const DataTypes& argument_types, int be_version)
+ : Data(argument_types, be_version) {
+ this->reset();
+ }
+
AggregateFunctionMinData() { reset(); }
void change_if_better(const IColumn& column, size_t row_num, Arena& arena)
{
@@ -584,99 +730,17 @@ struct AggregateFunctionAnyData : Data {
using Data::IsFixedLength;
static const char* name() { return "any"; }
constexpr static bool IS_ANY = true;
- void change_if_better(const IColumn& column, size_t row_num, Arena& arena)
{
- this->change_first_time(column, row_num, arena);
- }
-
- void change_if_better(const Self& to, Arena& arena) {
this->change_first_time(to, arena); }
-};
-
-// this is used for complex type about any_value function
-struct SingleValueDataComplexType {
- static const char* name() { return "any"; }
- constexpr static bool IS_ANY = true;
- constexpr static bool IsFixedLength = false;
- using Self = SingleValueDataComplexType;
-
- SingleValueDataComplexType() = default;
-
- SingleValueDataComplexType(const DataTypes& argument_types, int
be_version) {
- column_type = argument_types[0];
- column_data = column_type->create_column();
- be_exec_version = be_version;
- }
-
- bool has() const { return has_value; }
-
- void change_first_time(const IColumn& column, size_t row_num, Arena&) {
- if (UNLIKELY(!has())) {
- change_impl(column, row_num);
- }
- }
-
- void change_first_time(const Self& to, Arena&) {
- if (UNLIKELY(!has() && to.has())) {
- change_impl(*to.column_data, 0);
- }
- }
-
- void change_impl(const IColumn& column, size_t row_num) {
- DCHECK_EQ(column_data->size(), 0);
- column_data->insert_from(column, row_num);
- has_value = true;
- }
-
- void insert_result_into(IColumn& to) const {
- if (has()) {
- to.insert_from(*column_data, 0);
- } else {
- to.insert_default();
- }
- }
-
- void reset() {
- column_data->clear();
- has_value = false;
- }
- void write(BufferWritable& buf) const {
- buf.write_binary(has());
- if (!has()) {
- return;
- }
- auto size_bytes =
- column_type->get_uncompressed_serialized_bytes(*column_data,
be_exec_version);
- std::string memory_buffer(size_bytes, '0');
- auto* p = column_type->serialize(*column_data, memory_buffer.data(),
be_exec_version);
- buf.write_binary(memory_buffer);
- DCHECK_EQ(p, memory_buffer.data() + size_bytes);
- }
+ AggregateFunctionAnyData(const DataTypes& argument_types, int be_version)
+ : Data(argument_types, be_version) {};
- void read(BufferReadable& buf, Arena& arena) {
- buf.read_binary(has_value);
- if (!has()) {
- return;
- }
- std::string memory_buffer;
- buf.read_binary(memory_buffer);
- const auto* p =
- column_type->deserialize(memory_buffer.data(), &column_data,
be_exec_version);
- DCHECK_EQ(p, memory_buffer.data() + memory_buffer.size());
- }
+ AggregateFunctionAnyData() {};
void change_if_better(const IColumn& column, size_t row_num, Arena& arena)
{
this->change_first_time(column, row_num, arena);
}
void change_if_better(const Self& to, Arena& arena) {
this->change_first_time(to, arena); }
-
- bool check_if_equal(const IColumn& column, size_t row_num) const { return
false; }
-
-private:
- bool has_value = false;
- MutableColumnPtr column_data;
- DataTypePtr column_type;
- int be_exec_version = -1;
};
template <typename Data>
@@ -693,7 +757,9 @@ public:
type(this->argument_types[0]) {}
void create(AggregateDataPtr __restrict place) const override {
- if constexpr (std::is_same_v<Data, SingleValueDataComplexType>) {
+ if constexpr (std::is_same_v<Data,
AggregateFunctionMaxData<SingleValueDataComplexType>> ||
+ std::is_same_v<Data,
AggregateFunctionMinData<SingleValueDataComplexType>> ||
+ std::is_same_v<Data,
AggregateFunctionAnyData<SingleValueDataComplexType>>) {
new (place) Data(argument_types, IAggregateFunction::version);
} else {
new (place) Data;
diff --git a/be/src/vec/common/string_buffer.hpp
b/be/src/vec/common/string_buffer.hpp
index 37d2cae1ce7..ad944f94faa 100644
--- a/be/src/vec/common/string_buffer.hpp
+++ b/be/src/vec/common/string_buffer.hpp
@@ -52,6 +52,12 @@ public:
_now_offset = 0;
}
+ char* data() { return reinterpret_cast<char*>(_data.data() + _now_offset +
_offsets.back()); }
+
+ void add_offset(size_t len) { _now_offset += len; }
+
+ void resize(size_t size) { _data.resize(size + _now_offset +
_offsets.back()); }
+
template <typename T>
void write_number(T data) {
fmt::memory_buffer buffer;
@@ -235,6 +241,10 @@ public:
_data += len;
}
+ const char* data() { return _data; }
+
+ void add_offset(size_t len) { _data += len; }
+
void read_var_uint(UInt64& x) {
x = 0;
// get length from first byte firstly
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Max.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Max.java
index eae94f395cf..600ea1d939a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Max.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Max.java
@@ -57,11 +57,16 @@ public class Max extends NullableAggregateFunction
@Override
public void checkLegalityBeforeTypeCoercion() {
- if (getArgumentType(0).isOnlyMetricType()) {
+ if (getArgumentType(0).isOnlyMetricType() &&
!getArgumentType(0).isArrayType()) {
throw new AnalysisException(Type.OnlyMetricTypeErrorMsg);
}
}
+ @Override
+ public void checkLegalityAfterRewrite() {
+ checkLegalityBeforeTypeCoercion();
+ }
+
@Override
public FunctionSignature customSignature() {
DataType dataType = getArgument(0).getDataType();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Min.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Min.java
index 78700e7ed11..515e2e7323f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Min.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Min.java
@@ -58,11 +58,16 @@ public class Min extends NullableAggregateFunction
@Override
public void checkLegalityBeforeTypeCoercion() {
- if (getArgumentType(0).isOnlyMetricType()) {
+ if (getArgumentType(0).isOnlyMetricType() &&
!getArgumentType(0).isArrayType()) {
throw new AnalysisException(Type.OnlyMetricTypeErrorMsg);
}
}
+ @Override
+ public void checkLegalityAfterRewrite() {
+ checkLegalityBeforeTypeCoercion();
+ }
+
@Override
public FunctionSignature customSignature() {
DataType dataType = getArgument(0).getDataType();
diff --git
a/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.out
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.out
index aad0d7f0286..7df61ecbf6e 100644
---
a/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.out
+++
b/regression-test/data/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.out
@@ -372,3 +372,16 @@ true
-- !select_histogram_k13_all_null --
{"num_buckets":0,"buckets":[]}
+-- !maxmin_array_1 --
+\N \N
+
+-- !maxmin_array_2 --
+[7] [1, 2, 3]
+[1, 2, 3, 4] [1, 2]
+[11, 22, 33, 44] [3, 1]
+[10] []
+[11, null, null, 55] [1, null, null, 4]
+
+-- !maxmin_array_3 --
+[[3, 4], [3, 4]] [[1, 2], [3, 4]]
+
diff --git
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
index 5da30a1ae04..e2f186a61f3 100644
---
a/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
+++
b/regression-test/suites/query_p0/sql_functions/aggregate_functions/test_aggregate_all_functions2.groovy
@@ -271,4 +271,63 @@ suite("test_aggregate_all_functions2") {
qt_select_histogram_k11_all_null """SELECT histogram(k11, 11) FROM
baseall"""
qt_select_histogram_k12_all_null """SELECT histogram(k12, 12) FROM
baseall"""
qt_select_histogram_k13_all_null """SELECT histogram(k13, 13) FROM
baseall"""
+
+ sql "DROP TABLE IF EXISTS test_maxmin";
+
+ sql """
+ CREATE TABLE test_maxmin (
+ id INT,
+ arr ARRAY<INT>,
+ mp MAP<STRING, INT>,
+ st STRUCT<a: INT, b: STRING>,
+ weight INT
+ ) engine=olap
+ DISTRIBUTED BY HASH(`id`) BUCKETS 4
+ properties("replication_num" = "1","store_row_column" = "true");
+ """
+
+ qt_maxmin_array_1 """SELECT max(arr), min(arr) from test_maxmin"""
+
+ sql """
+ INSERT INTO test_maxmin (id, arr, mp, st, weight) VALUES
+ (1, [1,2,3], {"k1": 10, "k2": 20},
NAMED_STRUCT("a", 1, "b", "alpha"), 5),
+ (1, [5,6], {"k1": 30, "k2": 15},
NAMED_STRUCT("a", 2, "b", "beta"), 10),
+ (1, [7], {"x": 100, "y": 200},
NAMED_STRUCT("a", 3, "b", "gamma"), 3),
+ (2, [1,2,3], {"foo": 1, "bar": 2},
NAMED_STRUCT("a", 5, "b", "echo"), 15),
+ (2, [1,2], {"foo": 2, "bar": 1},
NAMED_STRUCT("a", 6, "b", "zulu"), 7),
+ (2, [1,2,3,4], {"key1": -1, "key2": -5},
NAMED_STRUCT("a", 7, "b", "seven"), 12),
+ (3, [3,1], {"key1": 99, "key2": 98},
NAMED_STRUCT("a", 8, "b", "eight"), 9),
+ (3, [10], {"A": 5, "B": 10},
NAMED_STRUCT("a", 9, "b", "nine"), 6),
+ (3, [11,22,33,44], {"A": 10, "B": 5},
NAMED_STRUCT("a", 10,"b", "ten"), 1),
+ (3, null, null, null,
17),
+ (4, [3,1], {"key1": 99, "key2": 98},
NAMED_STRUCT("a", 8, "b", "eight"), 9),
+ (4, [10], {"A": 5, "B": 10},
NAMED_STRUCT("a", 9, "b", "nine"), 6),
+ (4, [], {"x": 50, "y": 60},
NAMED_STRUCT("a", 4, "b", "delta"), 8),
+ (5, [1,2,3,4], {"A": null, "B": 5},
NAMED_STRUCT("a", null,"b", "ten"), 1),
+ (5, [1,2,null,4], {"A": 100, "B": null},
NAMED_STRUCT("a", 10,"b", null), 1),
+ (5, [1,null,null,4], {"A": null, "B": null},
NAMED_STRUCT("a", null,"b", null), 1),
+ (5, [1,null,3,4], {"A": null, "B": null},
NAMED_STRUCT("a", null,"b", null), 1),
+ (5, [11,null,null,55], {"A": 10, "B": 5},
NAMED_STRUCT("a", 10,"b", "ten"), 1);
+ """
+
+ qt_maxmin_array_2 """SELECT max(arr), min(arr) from test_maxmin group by
id order by id"""
+
+ sql """
+ CREATE TABLE test_nested_maxmin (
+ id INT,
+ arr ARRAY<ARRAY<int>>,
+ weight INT
+ ) engine=olap
+ DISTRIBUTED BY HASH(`id`) BUCKETS 4
+ properties("replication_num" = "1","store_row_column" = "true");
+ """
+
+ sql """
+ INSERT INTO test_nested_maxmin (id, arr, weight) VALUES
+ (1, [[1, 2], [3, 4]], 1),
+ (2, [[1, 2], [5, 6]], 2),
+ (3, [[3, 4], [3, 4]], 3);
+ """
+
+ qt_maxmin_array_3 """SELECT max(arr), min(arr) from test_nested_maxmin"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]