This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch tpc_preview6
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpc_preview6 by this push:
new af225207951 support aggregate_function_null_v2
af225207951 is described below
commit af22520795162d0ec3adfb96be37dfcd626ed08a
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Thu Jan 22 19:48:33 2026 +0800
support aggregate_function_null_v2
update
fix
format
update
update
update
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 10 -
.../pipeline/exec/aggregation_source_operator.cpp | 3 -
be/src/runtime/runtime_state.h | 5 +
.../vec/aggregate_functions/aggregate_function.h | 58 +-
.../aggregate_function_array_agg.h | 15 -
.../aggregate_functions/aggregate_function_avg.h | 16 +-
.../aggregate_function_bitmap.h | 11 -
.../aggregate_function_bitmap_agg.h | 22 -
.../aggregate_functions/aggregate_function_count.h | 24 +-
.../aggregate_functions/aggregate_function_map.h | 20 -
.../aggregate_function_map_v2.h | 20 -
.../aggregate_function_min_max.h | 19 +-
.../aggregate_function_null_v2.h | 598 +++++++++++++++++++++
.../aggregate_functions/aggregate_function_sum.h | 12 +-
.../aggregate_function_uniq_distribute_key.h | 12 +-
be/src/vec/aggregate_functions/helpers.h | 93 +++-
be/src/vec/exprs/vectorized_agg_fn.cpp | 11 +-
be/src/vec/exprs/vectorized_agg_fn.h | 3 -
.../vec/aggregate_functions/agg_function_test.h | 12 -
.../vec/aggregate_functions/agg_min_max_test.cpp | 13 +-
.../java/org/apache/doris/qe/SessionVariable.java | 9 +
gensrc/thrift/PaloInternalService.thrift | 1 +
22 files changed, 724 insertions(+), 263 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 9239e13f7d4..db684d4c35f 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -302,9 +302,6 @@ Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
int col_id = AggSharedState::get_slot_column_id(
Base::_shared_state->aggregate_evaluators[i]);
auto column = block->get_by_position(col_id).column;
- if (column->is_nullable()) {
- column =
((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
- }
size_t buffer_size =
Base::_shared_state->aggregate_evaluators[i]->function()->size_of_data() *
@@ -354,10 +351,6 @@ Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
Base::_shared_state->aggregate_evaluators[i]);
}
auto column = block->get_by_position(col_id).column;
- if (column->is_nullable()) {
- column = ((vectorized::ColumnNullable*)column.get())
- ->get_nested_column_ptr();
- }
size_t buffer_size =
Base::_shared_state->aggregate_evaluators[i]
->function()
@@ -412,9 +405,6 @@ Status
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
int col_id = AggSharedState::get_slot_column_id(
Base::_shared_state->aggregate_evaluators[i]);
auto column = block->get_by_position(col_id).column;
- if (column->is_nullable()) {
- column =
((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
- }
SCOPED_TIMER(_deserialize_data_timer);
Base::_shared_state->aggregate_evaluators[i]
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index b8ecd137016..846bfdf1c12 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -500,9 +500,6 @@ Status
AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i)
{
auto col_id = Base::_shared_state->probe_expr_ctxs.size() + i;
auto column = block->get_by_position(col_id).column;
- if (column->is_nullable()) {
- column =
((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
- }
size_t buffer_size =
Base::_shared_state->aggregate_evaluators[i]->function()->size_of_data() * rows;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 86ccb003279..aa4f6f10b50 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -570,6 +570,11 @@ public:
return _query_options.__isset.enable_parallel_scan &&
_query_options.enable_parallel_scan;
}
+ bool enable_aggregate_function_null_v2() const {
+ return _query_options.__isset.enable_aggregate_function_null_v2 &&
+ _query_options.enable_aggregate_function_null_v2;
+ }
+
bool is_read_csv_empty_line_as_null() const {
return _query_options.__isset.read_csv_empty_line_as_null &&
_query_options.read_csv_empty_line_as_null;
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h
b/be/src/vec/aggregate_functions/aggregate_function.h
index cdcef2f248f..a0181210ac2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -46,6 +46,7 @@ class IDataType;
struct AggregateFunctionAttr {
bool is_window_function {false};
bool is_foreach {false};
+ bool enable_aggregate_function_null_v2 {false};
std::vector<std::string> column_names;
};
@@ -131,12 +132,13 @@ public:
virtual void merge(AggregateDataPtr __restrict place,
ConstAggregateDataPtr rhs,
Arena&) const = 0;
- virtual void merge_vec(const AggregateDataPtr* places, size_t offset,
ConstAggregateDataPtr rhs,
- Arena&, const size_t num_rows) const = 0;
+ virtual void merge_vec(const AggregateDataPtr __restrict* __restrict
places, size_t offset,
+ ConstAggregateDataPtr __restrict rhs, Arena&,
+ const size_t num_rows) const = 0;
// same as merge_vec, but only call "merge" function when place is not
nullptr
- virtual void merge_vec_selected(const AggregateDataPtr* places, size_t
offset,
- ConstAggregateDataPtr rhs, Arena&,
+ virtual void merge_vec_selected(const AggregateDataPtr __restrict*
__restrict places,
+ size_t offset, ConstAggregateDataPtr
__restrict rhs, Arena&,
const size_t num_rows) const = 0;
/// Serializes state (to transmit it over the network, for example).
@@ -166,9 +168,6 @@ public:
AggregateDataPtr rhs,
const IColumn* column,
Arena&, const size_t
num_rows) const = 0;
- virtual void deserialize_from_column(AggregateDataPtr places, const
IColumn& column, Arena&,
- size_t num_rows) const = 0;
-
/// Deserializes state and merge it with current aggregation function.
virtual void deserialize_and_merge(AggregateDataPtr __restrict place,
AggregateDataPtr __restrict rhs,
BufferReadable& buf,
@@ -178,8 +177,10 @@ public:
const IColumn&
column, size_t begin,
size_t end, Arena&)
const = 0;
- virtual void deserialize_and_merge_from_column(AggregateDataPtr __restrict
place,
- const IColumn& column,
Arena&) const = 0;
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena& arena) const {
+ deserialize_and_merge_from_column_range(place, column, 0,
column.size() - 1, arena);
+ }
/// Inserts results into a column.
// todo: Consider whether this passes a ConstAggregateDataPtr
@@ -218,9 +219,6 @@ public:
Arena& arena, UInt8* use_null_result,
UInt8* could_use_previous_result)
const = 0;
- virtual void streaming_agg_serialize(const IColumn** columns,
BufferWritable& buf,
- const size_t num_rows, Arena&) const
= 0;
-
virtual void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
const size_t num_rows,
Arena&) const = 0;
@@ -309,6 +307,9 @@ public:
void destroy_vec(AggregateDataPtr __restrict place,
const size_t num_rows) const noexcept override {
+ if (is_trivial()) {
+ return;
+ }
const size_t size_of_data_ = size_of_data();
const Derived* derived = assert_cast<const Derived*>(this);
for (size_t i = 0; i != num_rows; ++i) {
@@ -419,8 +420,9 @@ public:
serialize_vec(places, offset, writer, num_rows);
}
- void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf,
- const size_t num_rows, Arena& arena) const
override {
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena&
arena) const override {
+ VectorBufferWriter buf(assert_cast<ColumnString&>(*dst));
std::vector<char> place(size_of_data());
const Derived* derived = assert_cast<const Derived*>(this);
for (size_t i = 0; i != num_rows; ++i) {
@@ -432,12 +434,6 @@ public:
}
}
- void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
- const size_t num_rows, Arena&
arena) const override {
- VectorBufferWriter writer(assert_cast<ColumnString&>(*dst));
- streaming_agg_serialize(columns, writer, num_rows, arena);
- }
-
void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
IColumn& to) const override {
VectorBufferWriter writter(assert_cast<ColumnString&>(to));
@@ -516,13 +512,9 @@ public:
derived->destroy_vec(rhs, num_rows);
}
- void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena& arena,
- size_t num_rows) const override {
- deserialize_vec(places, assert_cast<const ColumnString*>(&column),
arena, num_rows);
- }
-
- void merge_vec(const AggregateDataPtr* places, size_t offset,
ConstAggregateDataPtr rhs,
- Arena& arena, const size_t num_rows) const override {
+ void merge_vec(const AggregateDataPtr __restrict* __restrict places,
size_t offset,
+ ConstAggregateDataPtr __restrict rhs, Arena& arena,
+ const size_t num_rows) const override {
const auto* derived = assert_cast<const Derived*>(this);
const auto size_of_data = derived->size_of_data();
for (size_t i = 0; i != num_rows; ++i) {
@@ -530,8 +522,8 @@ public:
}
}
- void merge_vec_selected(const AggregateDataPtr* places, size_t offset,
- ConstAggregateDataPtr rhs, Arena& arena,
+ void merge_vec_selected(const AggregateDataPtr __restrict* __restrict
places, size_t offset,
+ ConstAggregateDataPtr __restrict rhs, Arena& arena,
const size_t num_rows) const override {
const auto* derived = assert_cast<const Derived*>(this);
const auto size_of_data = derived->size_of_data();
@@ -561,14 +553,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena& arena) const override {
- if (column.empty()) {
- return;
- }
- deserialize_and_merge_from_column_range(place, column, 0,
column.size() - 1, arena);
- }
-
void deserialize_and_merge(AggregateDataPtr __restrict place,
AggregateDataPtr __restrict rhs,
BufferReadable& buf, Arena& arena) const
override {
assert_cast<const Derived*,
TypeCheckOnRelease::DISABLE>(this)->deserialize(rhs, buf,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
b/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
index 9831c140e51..439c18d5ac6 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
@@ -324,14 +324,6 @@ public:
this->data(place).insert_result_into(to);
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena& arena) const override {
- const size_t num_rows = column.size();
- for (size_t i = 0; i != num_rows; ++i) {
- this->data(place).deserialize_and_merge(column, i);
- }
- }
-
void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t
offset,
AggregateDataPtr rhs, const IColumn*
column, Arena& arena,
const size_t num_rows) const override {
@@ -340,13 +332,6 @@ public:
}
}
- void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena& arena,
- size_t num_rows) const override {
- for (size_t i = 0; i != num_rows; ++i) {
- this->data(places).deserialize_and_merge(column, i);
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena& arena) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h
b/be/src/vec/aggregate_functions/aggregate_function_avg.h
index ea2060b3f5b..8e2f3a8d67c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h
@@ -235,7 +235,7 @@ public:
}
void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena&,
- size_t num_rows) const override {
+ size_t num_rows) const {
auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
DCHECK(col.size() >= num_rows) << "source column's size should greater
than num_rows";
auto* data = col.get_data().data();
@@ -268,20 +268,6 @@ public:
}
}
- NO_SANITIZE_UNDEFINED void
deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
- const
IColumn& column,
- Arena&) const
override {
- auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
- const size_t num_rows = column.size();
- DCHECK(col.size() >= num_rows) << "source column's size should greater
than num_rows";
- auto* data = reinterpret_cast<const Data*>(col.get_data().data());
-
- for (size_t i = 0; i != num_rows; ++i) {
- this->data(place).sum += data[i].sum;
- this->data(place).count += data[i].count;
- }
- }
-
NO_SANITIZE_UNDEFINED void deserialize_and_merge_from_column_range(
AggregateDataPtr __restrict place, const IColumn& column, size_t
begin, size_t end,
Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
index a17f1a42ffb..18a1d0480c5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
@@ -187,17 +187,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena&) const override {
- const auto& col = assert_cast<const ColumnBitmap&>(column);
- const size_t num_rows = column.size();
- const auto* data = col.get_data().data();
-
- for (size_t i = 0; i != num_rows; ++i) {
- this->data(place).merge(data[i]);
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
index 473b557f70d..b0cf1c6aa8d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
@@ -146,17 +146,6 @@ public:
}
}
- void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena&,
- size_t num_rows) const override {
- auto& col = assert_cast<const ColumnBitmap&>(column);
- DCHECK(col.size() >= num_rows) << "source column's size should greater
than num_rows";
- auto* src = col.get_data().data();
- auto* data = &(this->data(places));
- for (size_t i = 0; i != num_rows; ++i) {
- data[i].value = src[i];
- }
- }
-
void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
MutableColumnPtr& dst, const size_t num_rows)
const override {
auto& col = assert_cast<ColumnBitmap&>(*dst);
@@ -167,17 +156,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena&) const override {
- auto& col = assert_cast<const ColumnBitmap&>(column);
- const size_t num_rows = column.size();
- auto* data = col.get_data().data();
-
- for (size_t i = 0; i != num_rows; ++i) {
- this->data(place).value |= data[i];
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h
b/be/src/vec/aggregate_functions/aggregate_function_count.h
index 2fc31b0360b..9e7192805d6 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count.h
@@ -88,7 +88,7 @@ public:
}
void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena&,
- size_t num_rows) const override {
+ size_t num_rows) const {
auto data = assert_cast<const
ColumnFixedLengthObject&>(column).get_data().data();
memcpy(places, data, sizeof(Data) * num_rows);
}
@@ -119,16 +119,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena&) const override {
- auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
- const size_t num_rows = column.size();
- auto* data = reinterpret_cast<const Data*>(col.get_data().data());
- for (size_t i = 0; i != num_rows; ++i) {
- AggregateFunctionCount::data(place).count += data[i].count;
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena&) const override {
@@ -243,7 +233,7 @@ public:
}
void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena&,
- size_t num_rows) const override {
+ size_t num_rows) const {
auto data = assert_cast<const
ColumnFixedLengthObject&>(column).get_data().data();
memcpy(places, data, sizeof(Data) * num_rows);
}
@@ -275,16 +265,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena&) const override {
- auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
- const size_t num_rows = column.size();
- auto* data = reinterpret_cast<const Data*>(col.get_data().data());
- for (size_t i = 0; i != num_rows; ++i) {
- AggregateFunctionCountNotNullUnary::data(place).count +=
data[i].count;
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_map.h
b/be/src/vec/aggregate_functions/aggregate_function_map.h
index f30b65bbb41..5e162ef7f35 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_map.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_map.h
@@ -261,16 +261,6 @@ public:
}
}
- void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena&,
- size_t num_rows) const override {
- const auto& col = assert_cast<const ColumnMap&>(column);
- auto* data = &(this->data(places));
- for (size_t i = 0; i != num_rows; ++i) {
- auto map = col[i].get<TYPE_MAP>();
- data->add(map[0], map[1]);
- }
- }
-
void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
MutableColumnPtr& dst, const size_t num_rows)
const override {
for (size_t i = 0; i != num_rows; ++i) {
@@ -279,16 +269,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena&) const override {
- auto& col = assert_cast<const ColumnMap&>(column);
- const size_t num_rows = column.size();
- for (size_t i = 0; i != num_rows; ++i) {
- auto map = col[i].get<TYPE_MAP>();
- this->data(place).add(map[0], map[1]);
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
b/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
index 0c806219dc3..a56c54b4291 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
@@ -235,16 +235,6 @@ public:
}
}
- void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena&,
- size_t num_rows) const override {
- const auto& col = assert_cast<const ColumnMap&>(column);
- auto* data = &(this->data(places));
- for (size_t i = 0; i != num_rows; ++i) {
- auto map = col[i].get<TYPE_MAP>();
- data->add(map[0], map[1]);
- }
- }
-
void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
MutableColumnPtr& dst, const size_t num_rows)
const override {
for (size_t i = 0; i != num_rows; ++i) {
@@ -253,16 +243,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena&) const override {
- const auto& col = assert_cast<const ColumnMap&>(column);
- const size_t num_rows = column.size();
- for (size_t i = 0; i != num_rows; ++i) {
- auto map = col[i].get<TYPE_MAP>();
- this->data(place).add(map[0], map[1]);
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index 835c2c0c440..7a5b43eca88 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -806,7 +806,7 @@ public:
}
void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena& arena,
- size_t num_rows) const override {
+ size_t num_rows) const {
if constexpr (Data::IsFixedLength) {
const auto& col = assert_cast<const
ColumnFixedLengthObject&>(column);
auto* column_data = reinterpret_cast<const
Data*>(col.get_data().data());
@@ -815,7 +815,8 @@ public:
data[i] = column_data[i];
}
} else {
- Base::deserialize_from_column(places, column, arena, num_rows);
+ this->deserialize_vec(places, assert_cast<const
ColumnString*>(&column), arena,
+ num_rows);
}
}
@@ -847,20 +848,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena& arena) const override {
- if constexpr (Data::IsFixedLength) {
- const auto& col = assert_cast<const
ColumnFixedLengthObject&>(column);
- auto* column_data = reinterpret_cast<const
Data*>(col.get_data().data());
- const size_t num_rows = column.size();
- for (size_t i = 0; i != num_rows; ++i) {
- this->data(place).change_if_better(column_data[i], arena);
- }
- } else {
- Base::deserialize_and_merge_from_column(place, column, arena);
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena& arena) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null_v2.h
b/be/src/vec/aggregate_functions/aggregate_function_null_v2.h
new file mode 100644
index 00000000000..62fe35dde4e
--- /dev/null
+++ b/be/src/vec/aggregate_functions/aggregate_function_null_v2.h
@@ -0,0 +1,598 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionNull.h
+// and modified by Doris
+
+#pragma once
+
+#include <glog/logging.h>
+
+#include <memory>
+
+#include "common/logging.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/aggregate_function_distinct.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/string_buffer.hpp"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_nullable.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+template <typename NestFunction, bool result_is_nullable, typename Derived>
+class AggregateFunctionNullBaseInlineV2 : public
IAggregateFunctionHelper<Derived> {
+protected:
+ std::unique_ptr<NestFunction> nested_function;
+ size_t prefix_size;
+ bool is_window_function = false;
+
+ AggregateDataPtr nested_place(AggregateDataPtr __restrict place) const
noexcept {
+ return place + prefix_size;
+ }
+
+ ConstAggregateDataPtr nested_place(ConstAggregateDataPtr __restrict place)
const noexcept {
+ return place + prefix_size;
+ }
+
+ static void init(AggregateDataPtr __restrict place, bool
is_window_function) noexcept {
+ init_flag(place);
+ init_null_count(place, is_window_function);
+ }
+
+ static void init_flag(AggregateDataPtr __restrict place) noexcept {
+ if constexpr (result_is_nullable) {
+ place[0] = false;
+ }
+ }
+
+ static void set_flag(AggregateDataPtr __restrict place) noexcept {
+ if constexpr (result_is_nullable) {
+ place[0] = true;
+ }
+ }
+
+ static bool get_flag(ConstAggregateDataPtr __restrict place) noexcept {
+ return result_is_nullable ? place[0] : true;
+ }
+
+ static void init_null_count(AggregateDataPtr __restrict place,
+ bool is_window_function) noexcept {
+ if (is_window_function && result_is_nullable) {
+ unaligned_store<int32_t>(place + 1, 0);
+ }
+ }
+
+ static void update_null_count(AggregateDataPtr __restrict place, bool
incremental,
+ bool is_window_function) noexcept {
+ if (is_window_function && result_is_nullable) {
+ auto null_count = unaligned_load<int32_t>(place + 1);
+ incremental ? null_count++ : null_count--;
+ unaligned_store<int32_t>(place + 1, null_count);
+ }
+ }
+
+ static int32_t get_null_count(ConstAggregateDataPtr __restrict place,
+ bool is_window_function) noexcept {
+ int32_t num = 0;
+ if (is_window_function && result_is_nullable) {
+ num = unaligned_load<int32_t>(place + 1);
+ }
+ return num;
+ }
+
+public:
+ AggregateFunctionNullBaseInlineV2(IAggregateFunction* nested_function_,
+ const DataTypes& arguments, bool
is_window_function_)
+ : IAggregateFunctionHelper<Derived>(arguments),
+ nested_function {assert_cast<NestFunction*>(nested_function_)},
+ is_window_function(is_window_function_) {
+ DCHECK(nested_function_ != nullptr);
+ if constexpr (result_is_nullable) {
+ if (this->is_window_function) {
+ //
flag|---null_count----|-------padding-------|--nested_data----|
+ size_t nested_align = nested_function->align_of_data();
+ prefix_size = 1 + sizeof(int32_t);
+ if (prefix_size % nested_align != 0) {
+ prefix_size += (nested_align - (prefix_size %
nested_align));
+ }
+ } else {
+ prefix_size = nested_function->align_of_data();
+ }
+ } else {
+ prefix_size = 0;
+ }
+ }
+
+ MutableColumnPtr create_serialize_column() const override {
+ if constexpr (result_is_nullable) {
+ return
ColumnNullable::create(nested_function->create_serialize_column(),
+ ColumnUInt8::create());
+ }
+ return nested_function->create_serialize_column();
+ }
+
+ DataTypePtr get_serialized_type() const override {
+ if constexpr (result_is_nullable) {
+ return make_nullable(nested_function->get_serialized_type());
+ }
+ return nested_function->get_serialized_type();
+ }
+
+ void set_query_context(QueryContext* ctx) override {
+ return nested_function->set_query_context(ctx);
+ }
+
+ bool is_blockable() const override { return
nested_function->is_blockable(); }
+
+ void set_version(const int version_) override {
+ IAggregateFunctionHelper<Derived>::set_version(version_);
+ nested_function->set_version(version_);
+ }
+
+ String get_name() const override { return "NullableV2(" +
nested_function->get_name() + ")"; }
+
+ DataTypePtr get_return_type() const override {
+ return result_is_nullable ?
make_nullable(nested_function->get_return_type())
+ : nested_function->get_return_type();
+ }
+
+ void create(AggregateDataPtr __restrict place) const override {
+ init(place, this->is_window_function);
+ nested_function->create(nested_place(place));
+ }
+
+ void destroy(AggregateDataPtr __restrict place) const noexcept override {
+ nested_function->destroy(nested_place(place));
+ }
+ void reset(AggregateDataPtr place) const override {
+ init(place, this->is_window_function);
+ nested_function->reset(nested_place(place));
+ }
+
+ bool is_trivial() const override { return false; }
+
+ size_t size_of_data() const override { return prefix_size +
nested_function->size_of_data(); }
+
+ size_t align_of_data() const override {
+ if (this->is_window_function && result_is_nullable) {
+ return std::max(nested_function->align_of_data(),
alignof(int32_t));
+ } else {
+ return nested_function->align_of_data();
+ }
+ }
+
+ void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+ Arena& arena) const override {
+ if (get_flag(rhs)) {
+ set_flag(place);
+ nested_function->merge(nested_place(place), nested_place(rhs),
arena);
+ }
+ }
+
+ void serialize(ConstAggregateDataPtr __restrict place, BufferWritable&
buf) const override {
+ bool flag = get_flag(place);
+ if constexpr (result_is_nullable) {
+ buf.write_binary(flag);
+ }
+ if (flag) {
+ nested_function->serialize(nested_place(place), buf);
+ }
+ }
+
+ void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+ Arena& arena) const override {
+ bool flag = true;
+ if constexpr (result_is_nullable) {
+ buf.read_binary(flag);
+ }
+ if (flag) {
+ set_flag(place);
+ nested_function->deserialize(nested_place(place), buf, arena);
+ }
+ }
+
+ void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
+ MutableColumnPtr& dst, const size_t num_rows)
const override {
+ if constexpr (result_is_nullable) {
+ auto& nullable_col = assert_cast<ColumnNullable&>(*dst);
+ auto& nested_col = nullable_col.get_nested_column();
+ auto& null_map = nullable_col.get_null_map_data();
+ MutableColumnPtr nested_col_ptr = nested_col.assume_mutable();
+
+ null_map.resize(num_rows);
+ uint8_t* __restrict null_map_data = null_map.data();
+ for (size_t i = 0; i < num_rows; ++i) {
+ null_map_data[i] = !get_flag(places[i] + offset);
+ }
+ nested_function->serialize_to_column(places, offset + prefix_size,
nested_col_ptr,
+ num_rows);
+ } else {
+ nested_function->serialize_to_column(places, offset, dst,
num_rows);
+ }
+ }
+
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena&
arena) const override {
+ const auto* src_nullable_col = assert_cast<const
ColumnNullable*>(columns[0]);
+ const auto* __restrict src_null_map_data =
src_nullable_col->get_null_map_data().data();
+
+ size_t nested_size = nested_function->size_of_data();
+ std::vector<AggregateDataPtr> nested_places(num_rows);
+ std::vector<char> places_data(num_rows * nested_size);
+ for (size_t i = 0; i < num_rows; ++i) {
+ nested_places[i] = places_data.data() + i * nested_size;
+ }
+
+ if (!nested_function->is_trivial()) {
+ for (int i = 0; i < num_rows; ++i) {
+ try {
+ nested_function->create(nested_places[i]);
+ } catch (...) {
+ for (int j = 0; j < i; ++j) {
+ nested_function->destroy(nested_places[j]);
+ }
+ throw;
+ }
+ }
+ }
+ Defer destroy_places = {[&]() {
+ if (!nested_function->is_trivial()) {
+ for (int i = 0; i < num_rows; ++i) {
+ nested_function->destroy(nested_places[i]);
+ }
+ }
+ }};
+ const IColumn* src_nested_column =
+ src_nullable_col->get_nested_column().assume_mutable().get();
+ if (src_nullable_col->has_null()) {
+ for (size_t i = 0; i < num_rows; ++i) {
+ if (!src_null_map_data[i]) {
+ nested_function->add(nested_places[i], &src_nested_column,
i, arena);
+ }
+ }
+ } else {
+ nested_function->add_batch(num_rows, nested_places.data(), 0,
&src_nested_column, arena,
+ false);
+ }
+
+ if constexpr (result_is_nullable) {
+ auto& dst_nullable_col = assert_cast<ColumnNullable&>(*dst);
+ MutableColumnPtr nested_col_ptr =
dst_nullable_col.get_nested_column().assume_mutable();
+ dst_nullable_col.get_null_map_column().insert_range_from(
+ src_nullable_col->get_null_map_column(), 0, num_rows);
+ nested_function->serialize_to_column(nested_places, 0,
nested_col_ptr, num_rows);
+ } else {
+ nested_function->serialize_to_column(nested_places, 0, dst,
num_rows);
+ }
+ }
+
+ void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
+ IColumn& to) const override {
+ if constexpr (result_is_nullable) {
+ auto& nullable_col = assert_cast<ColumnNullable&>(to);
+ auto& nested_col = nullable_col.get_nested_column();
+ auto& null_map = nullable_col.get_null_map_data();
+
+ bool flag = get_flag(place);
+ if (flag) {
+
nested_function->serialize_without_key_to_column(nested_place(place),
nested_col);
+ null_map.push_back(0);
+ } else {
+ nested_col.insert_default();
+ null_map.push_back(1);
+ }
+ } else {
+
nested_function->serialize_without_key_to_column(nested_place(place), to);
+ }
+ }
+
+ void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t
offset,
+ AggregateDataPtr rhs, const IColumn*
column, Arena& arena,
+ const size_t num_rows) const override {
+ if constexpr (result_is_nullable) {
+ const auto& nullable_col = assert_cast<const
ColumnNullable&>(*column);
+ const auto& nested_col = nullable_col.get_nested_column();
+ const auto* __restrict null_map_data =
nullable_col.get_null_map_data().data();
+
+ for (size_t i = 0; i < num_rows; ++i) {
+ *(places[i] + offset) |= (!null_map_data[i]);
+ }
+ nested_function->deserialize_and_merge_vec(places, offset +
prefix_size, rhs,
+ &nested_col, arena,
num_rows);
+ } else {
+ this->nested_function->deserialize_and_merge_vec(places, offset,
rhs, column, arena,
+ num_rows);
+ }
+ }
+
+ void deserialize_and_merge_vec_selected(const AggregateDataPtr* places,
size_t offset,
+ AggregateDataPtr rhs, const
IColumn* column,
+ Arena& arena, const size_t
num_rows) const override {
+ if constexpr (result_is_nullable) {
+ const auto& nullable_col = assert_cast<const
ColumnNullable&>(*column);
+ const auto& nested_col = nullable_col.get_nested_column();
+ const auto* __restrict null_map_data =
nullable_col.get_null_map_data().data();
+
+ for (size_t i = 0; i < num_rows; ++i) {
+ *(places[i] + offset) |= (!null_map_data[i]);
+ }
+ nested_function->deserialize_and_merge_vec_selected(places, offset
+ prefix_size, rhs,
+ &nested_col,
arena, num_rows);
+ } else {
+ this->nested_function->deserialize_and_merge_vec_selected(places,
offset, rhs, column,
+ arena,
num_rows);
+ }
+ }
+
+ void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
+ const IColumn& column, size_t
begin, size_t end,
+ Arena& arena) const override {
+ DCHECK(end <= column.size() && begin <= end)
+ << ", begin:" << begin << ", end:" << end << ",
column.size():" << column.size();
+
+ if constexpr (result_is_nullable) {
+ const auto& nullable_col = assert_cast<const
ColumnNullable&>(column);
+ const auto& nested_col = nullable_col.get_nested_column();
+ const auto& null_map = nullable_col.get_null_map_data();
+
+ for (size_t i = begin; i <= end; ++i) {
+ if (!null_map[i]) {
+ set_flag(place);
+ nested_function->deserialize_and_merge_from_column_range(
+ nested_place(place), nested_col, i, i, arena);
+ }
+ }
+ } else {
+ nested_function->deserialize_and_merge_from_column_range(place,
column, begin, end,
+ arena);
+ }
+ }
+
+ void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
+ if constexpr (result_is_nullable) {
+ auto& to_concrete = assert_cast<ColumnNullable&>(to);
+ if (get_flag(place)) {
+ nested_function->insert_result_into(nested_place(place),
+
to_concrete.get_nested_column());
+ to_concrete.get_null_map_data().push_back(0);
+ } else {
+ to_concrete.insert_default();
+ }
+ } else {
+ nested_function->insert_result_into(nested_place(place), to);
+ }
+ }
+};
+
+template <typename NestFuction, bool result_is_nullable>
+class AggregateFunctionNullUnaryInlineV2 final
+ : public AggregateFunctionNullBaseInlineV2<
+ NestFuction, result_is_nullable,
+ AggregateFunctionNullUnaryInlineV2<NestFuction,
result_is_nullable>> {
+public:
+ AggregateFunctionNullUnaryInlineV2(IAggregateFunction* nested_function_,
+ const DataTypes& arguments, bool
is_window_function_)
+ : AggregateFunctionNullBaseInlineV2<
+ NestFuction, result_is_nullable,
+ AggregateFunctionNullUnaryInlineV2<NestFuction,
result_is_nullable>>(
+ nested_function_, arguments, is_window_function_) {}
+
+ void add(AggregateDataPtr __restrict place, const IColumn** columns,
ssize_t row_num,
+ Arena& arena) const override {
+ const auto* column =
+ assert_cast<const ColumnNullable*,
TypeCheckOnRelease::DISABLE>(columns[0]);
+ if (!column->is_null_at(row_num)) {
+ this->set_flag(place);
+ const IColumn* nested_column = &column->get_nested_column();
+ this->nested_function->add(this->nested_place(place),
&nested_column, row_num, arena);
+ } else {
+ this->update_null_count(place, true, this->is_window_function);
+ }
+ }
+
+ IAggregateFunction* transmit_to_stable() override {
+ auto f = AggregateFunctionNullBaseInlineV2<
+ NestFuction, result_is_nullable,
+ AggregateFunctionNullUnaryInlineV2<NestFuction,
result_is_nullable>>::
+ nested_function->transmit_to_stable();
+ if (!f) {
+ return nullptr;
+ }
+ return new AggregateFunctionNullUnaryInlineV2<
+ typename FunctionStableTransfer<NestFuction>::FunctionStable,
result_is_nullable>(
+ f, IAggregateFunction::argument_types,
this->is_window_function);
+ }
+
+ void add_batch(size_t batch_size, AggregateDataPtr* __restrict places,
size_t place_offset,
+ const IColumn** columns, Arena& arena, bool agg_many) const
override {
+ const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+ const IColumn* nested_column = &column->get_nested_column();
+ if (column->has_null()) {
+ const auto* __restrict null_map_data =
column->get_null_map_data().data();
+ for (int i = 0; i < batch_size; ++i) {
+ if (!null_map_data[i]) {
+ AggregateDataPtr __restrict place = places[i] +
place_offset;
+ this->set_flag(place);
+ this->nested_function->add(this->nested_place(place),
&nested_column, i, arena);
+ }
+ }
+ } else {
+ if constexpr (result_is_nullable) {
+ for (int i = 0; i < batch_size; ++i) {
+ AggregateDataPtr __restrict place = places[i] +
place_offset;
+ place[0] |= 1;
+ this->nested_function->add(this->nested_place(place),
&nested_column, i, arena);
+ }
+ } else {
+ this->nested_function->add_batch(batch_size, places,
place_offset, &nested_column,
+ arena, agg_many);
+ }
+ }
+ }
+
+ void add_batch_single_place(size_t batch_size, AggregateDataPtr place,
const IColumn** columns,
+ Arena& arena) const override {
+ const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+ bool has_null = column->has_null();
+
+ if (has_null) {
+ for (size_t i = 0; i < batch_size; ++i) {
+ this->add(place, columns, i, arena);
+ }
+ } else {
+ this->set_flag(place);
+ const IColumn* nested_column = &column->get_nested_column();
+ this->nested_function->add_batch_single_place(batch_size,
this->nested_place(place),
+ &nested_column,
arena);
+ }
+ }
+
+ void add_batch_range(size_t batch_begin, size_t batch_end,
AggregateDataPtr place,
+ const IColumn** columns, Arena& arena, bool has_null)
override {
+ const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+
+ if (has_null) {
+ for (size_t i = batch_begin; i <= batch_end; ++i) {
+ this->add(place, columns, i, arena);
+ }
+ } else {
+ this->set_flag(place);
+ const IColumn* nested_column = &column->get_nested_column();
+ this->nested_function->add_batch_range(batch_begin, batch_end,
+ this->nested_place(place),
&nested_column, arena,
+ false);
+ }
+ }
+
+ void add_range_single_place(int64_t partition_start, int64_t
partition_end, int64_t frame_start,
+ int64_t frame_end, AggregateDataPtr place,
const IColumn** columns,
+ Arena& arena, UInt8* use_null_result,
+ UInt8* could_use_previous_result) const
override {
+ auto current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+ if (current_frame_start >= current_frame_end) {
+ if (!*could_use_previous_result) {
+ this->init_flag(place);
+ *use_null_result = true;
+ return;
+ }
+ } else {
+ *use_null_result = false;
+ *could_use_previous_result = true;
+ }
+ const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+ bool has_null = column->has_null();
+ if (has_null) {
+ for (size_t i = current_frame_start; i < current_frame_end; ++i) {
+ this->add(place, columns, i, arena);
+ }
+ } else {
+ const IColumn* nested_column = &(column->get_nested_column());
+ this->set_flag(place);
+ this->nested_function->add_range_single_place(
+ partition_start, partition_end, frame_start, frame_end,
+ this->nested_place(place), &nested_column, arena,
use_null_result,
+ could_use_previous_result);
+ }
+ }
+
+ bool supported_incremental_mode() const override {
+ return this->nested_function->supported_incremental_mode();
+ }
+
+ void execute_function_with_incremental(int64_t partition_start, int64_t
partition_end,
+ int64_t frame_start, int64_t
frame_end,
+ AggregateDataPtr place, const
IColumn** columns,
+ Arena& arena, bool previous_is_nul,
bool end_is_nul,
+ bool has_null, UInt8*
use_null_result,
+ UInt8* could_use_previous_result)
const override {
+ int64_t current_frame_start = std::max<int64_t>(frame_start,
partition_start);
+ int64_t current_frame_end = std::min<int64_t>(frame_end,
partition_end);
+ if (current_frame_start >= current_frame_end) {
+ *use_null_result = true;
+ this->init_flag(place);
+ return;
+ }
+
+ DCHECK(columns[0]->is_nullable()) << columns[0]->get_name();
+ const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+ const IColumn* nested_column = &column->get_nested_column();
+
+ if (!column->has_null()) {
+ if (*could_use_previous_result) {
+ this->nested_function->execute_function_with_incremental(
+ partition_start, partition_end, frame_start, frame_end,
+ this->nested_place(place), &nested_column, arena,
previous_is_nul,
+ end_is_nul, false, use_null_result,
could_use_previous_result);
+ } else {
+ this->nested_function->add_range_single_place(
+ partition_start, partition_end, frame_start, frame_end,
+ this->nested_place(place), &nested_column, arena,
use_null_result,
+ could_use_previous_result);
+ }
+ this->set_flag(place);
+ return;
+ }
+
+ const auto* __restrict null_map_data =
column->get_null_map_data().data();
+ if (*could_use_previous_result) {
+ auto outcoming_pos = frame_start - 1;
+ auto incoming_pos = frame_end - 1;
+ bool is_previous_frame_start_null = false;
+ if (outcoming_pos >= partition_start && outcoming_pos <
partition_end &&
+ null_map_data[outcoming_pos] == 1) {
+ is_previous_frame_start_null = true;
+ DCHECK_EQ(result_is_nullable, true);
+ DCHECK_EQ(this->is_window_function, true);
+ this->update_null_count(place, false,
this->is_window_function);
+ }
+ bool is_current_frame_end_null = false;
+ if (incoming_pos >= partition_start && incoming_pos <
partition_end &&
+ null_map_data[incoming_pos] == 1) {
+ is_current_frame_end_null = true;
+ DCHECK_EQ(result_is_nullable, true);
+ DCHECK_EQ(this->is_window_function, true);
+ this->update_null_count(place, true, this->is_window_function);
+ }
+ const IColumn* columns_tmp[2] {nested_column,
&(*column->get_null_map_column_ptr())};
+ this->nested_function->execute_function_with_incremental(
+ partition_start, partition_end, frame_start, frame_end,
+ this->nested_place(place), columns_tmp, arena,
is_previous_frame_start_null,
+ is_current_frame_end_null, true, use_null_result,
could_use_previous_result);
+ DCHECK_EQ(result_is_nullable, true);
+ DCHECK_EQ(this->is_window_function, true);
+ if (current_frame_end - current_frame_start ==
+ this->get_null_count(place, this->is_window_function)) {
+ this->init_flag(place);
+ } else {
+ this->set_flag(place);
+ }
+ } else {
+ this->add_range_single_place(partition_start, partition_end,
frame_start, frame_end,
+ place, columns, arena,
use_null_result,
+ could_use_previous_result);
+ }
+ }
+};
+
+} // namespace doris::vectorized
+
+#include "common/compile_check_end.h"
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h
b/be/src/vec/aggregate_functions/aggregate_function_sum.h
index 62822288860..2054886b16d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h
@@ -134,7 +134,7 @@ public:
}
void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena&,
- size_t num_rows) const override {
+ size_t num_rows) const {
auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
auto* data = col.get_data().data();
memcpy(places, data, sizeof(Data) * num_rows);
@@ -168,16 +168,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena&) const override {
- auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
- const size_t num_rows = column.size();
- auto* data = reinterpret_cast<const Data*>(col.get_data().data());
- for (size_t i = 0; i != num_rows; ++i) {
- this->data(place).sum += data[i].sum;
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena&) const override {
diff --git
a/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
b/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
index 536a64af602..1c27c37b5ac 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
@@ -165,7 +165,7 @@ public:
}
void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena&,
- size_t num_rows) const override {
+ size_t num_rows) const {
auto data = reinterpret_cast<const UInt64*>(
assert_cast<const
ColumnFixedLengthObject&>(column).get_data().data());
for (size_t i = 0; i != num_rows; ++i) {
@@ -199,16 +199,6 @@ public:
}
}
- void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
- Arena&) const override {
- auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
- const size_t num_rows = column.size();
- auto* data = reinterpret_cast<const UInt64*>(col.get_data().data());
- for (size_t i = 0; i != num_rows; ++i) {
- AggregateFunctionUniqDistributeKey::data(place).count += data[i];
- }
- }
-
void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict
place,
const IColumn& column, size_t
begin, size_t end,
Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/helpers.h
b/be/src/vec/aggregate_functions/helpers.h
index 2317925de7b..5762aa7ad4d 100644
--- a/be/src/vec/aggregate_functions/helpers.h
+++ b/be/src/vec/aggregate_functions/helpers.h
@@ -23,6 +23,7 @@
#include "runtime/define_primitive_type.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/aggregate_function_null.h"
+#include "vec/aggregate_functions/aggregate_function_null_v2.h"
#include "vec/core/call_on_type_index.h"
#include "vec/data_types/data_type.h"
#include "vec/utils/template_helpers.hpp"
@@ -68,12 +69,13 @@
decltype(&IAggregateFunctionHelper<
\
FunctionTemplate>::serialize_without_key_to_column)>, \
"need to override serialize_without_key_to_column");
\
- static_assert(!std::is_same_v<
\
-
decltype(&FunctionTemplate::deserialize_and_merge_from_column), \
- decltype(&IAggregateFunctionHelper<
\
-
FunctionTemplate>::deserialize_and_merge_from_column)>, \
- "need to override "
\
- "deserialize_and_merge_from_column");
\
+ static_assert(
\
+ !std::is_same_v<
\
+
decltype(&FunctionTemplate::deserialize_and_merge_from_column_range), \
+ decltype(&IAggregateFunctionHelper<
\
+
FunctionTemplate>::deserialize_and_merge_from_column)>, \
+ "need to override "
\
+ "deserialize_and_merge_from_column");
\
}
\
} while (false)
@@ -85,6 +87,11 @@ struct creator_without_type {
using NullableT = std::conditional_t<multi_arguments,
AggregateFunctionNullVariadicInline<T, f>,
AggregateFunctionNullUnaryInline<T,
f>>;
+ template <bool multi_arguments, bool f, typename T>
+ using NullableV2T =
+ std::conditional_t<multi_arguments,
AggregateFunctionNullVariadicInline<T, f>,
+ AggregateFunctionNullUnaryInlineV2<T, f>>;
+
template <typename AggregateFunctionTemplate>
static AggregateFunctionPtr creator(const std::string& name, const
DataTypes& argument_types,
const DataTypePtr& result_type,
@@ -142,9 +149,15 @@ struct creator_without_type {
if (have_nullable(argument_types_)) {
std::visit(
[&](auto multi_arguments, auto result_is_nullable) {
- result.reset(new NullableT<multi_arguments,
result_is_nullable,
- AggregateFunctionTemplate>(
- result.release(), argument_types_,
attr.is_window_function));
+ if (attr.enable_aggregate_function_null_v2) {
+ result.reset(new NullableV2T<multi_arguments,
result_is_nullable,
+
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ } else {
+ result.reset(new NullableT<multi_arguments,
result_is_nullable,
+
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ }
},
make_bool_variant(argument_types_.size() > 1),
make_bool_variant(result_is_nullable));
@@ -166,11 +179,21 @@ struct creator_without_type {
std::forward<TArgs>(args)...,
remove_nullable(argument_types_)));
if (have_nullable(argument_types_)) {
if (argument_types_.size() > 1) {
- result.reset(new NullableT<true, false,
AggregateFunctionTemplate>(
- result.release(), argument_types_,
attr.is_window_function));
+ if (attr.enable_aggregate_function_null_v2) {
+ result.reset(new NullableV2T<true, false,
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ } else {
+ result.reset(new NullableT<true, false,
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ }
} else {
- result.reset(new NullableT<false, false,
AggregateFunctionTemplate>(
- result.release(), argument_types_,
attr.is_window_function));
+ if (attr.enable_aggregate_function_null_v2) {
+ result.reset(new NullableV2T<false, false,
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ } else {
+ result.reset(new NullableT<false, false,
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ }
}
}
@@ -192,10 +215,15 @@ struct creator_without_type {
if (have_nullable(argument_types_)) {
std::visit(
[&](auto result_is_nullable) {
- result.reset(
- new NullableT<true, result_is_nullable,
AggregateFunctionTemplate>(
- result.release(), argument_types_,
- attr.is_window_function));
+ if (attr.enable_aggregate_function_null_v2) {
+ result.reset(new NullableV2T<true,
result_is_nullable,
+
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ } else {
+ result.reset(new NullableT<true,
result_is_nullable,
+
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ }
},
make_bool_variant(result_is_nullable));
}
@@ -220,8 +248,13 @@ struct creator_without_type {
std::unique_ptr<IAggregateFunction>
result(std::make_unique<AggregateFunctionTemplate>(
std::forward<TArgs>(args)...,
remove_nullable(argument_types_)));
if (have_nullable(argument_types_)) {
- result.reset(new NullableT<true, false, AggregateFunctionTemplate>(
- result.release(), argument_types_,
attr.is_window_function));
+ if (attr.enable_aggregate_function_null_v2) {
+ result.reset(new NullableV2T<true, false,
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ } else {
+ result.reset(new NullableT<true, false,
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ }
}
CHECK_AGG_FUNCTION_SERIALIZED_TYPE(AggregateFunctionTemplate);
return AggregateFunctionPtr(result.release());
@@ -241,10 +274,15 @@ struct creator_without_type {
if (have_nullable(argument_types_)) {
std::visit(
[&](auto result_is_nullable) {
- result.reset(
- new NullableT<false, result_is_nullable,
AggregateFunctionTemplate>(
- result.release(), argument_types_,
- attr.is_window_function));
+ if (attr.enable_aggregate_function_null_v2) {
+ result.reset(new NullableV2T<false,
result_is_nullable,
+
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ } else {
+ result.reset(new NullableT<false,
result_is_nullable,
+
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ }
},
make_bool_variant(result_is_nullable));
}
@@ -268,8 +306,13 @@ struct creator_without_type {
std::unique_ptr<IAggregateFunction>
result(std::make_unique<AggregateFunctionTemplate>(
std::forward<TArgs>(args)...,
remove_nullable(argument_types_)));
if (have_nullable(argument_types_)) {
- result.reset(new NullableT<false, false,
AggregateFunctionTemplate>(
- result.release(), argument_types_,
attr.is_window_function));
+ if (attr.enable_aggregate_function_null_v2) {
+ result.reset(new NullableV2T<false, false,
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ } else {
+ result.reset(new NullableT<false, false,
AggregateFunctionTemplate>(
+ result.release(), argument_types_,
attr.is_window_function));
+ }
}
CHECK_AGG_FUNCTION_SERIALIZED_TYPE(AggregateFunctionTemplate);
return AggregateFunctionPtr(result.release());
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 7da847d8d63..22ea6ccff4d 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -217,6 +217,8 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const
RowDescriptor& desc,
state->be_exec_version(),
{.is_window_function = _is_window_function,
.is_foreach = is_foreach,
+ .enable_aggregate_function_null_v2 =
+ state->enable_aggregate_function_null_v2(),
.column_names = std::move(column_names)});
} else {
_function = AggregateFunctionSimpleFactory::instance().get(
@@ -224,6 +226,8 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const
RowDescriptor& desc,
state->be_exec_version(),
{.is_window_function = _is_window_function,
.is_foreach = is_foreach,
+ .enable_aggregate_function_null_v2 =
+ state->enable_aggregate_function_null_v2(),
.column_names = std::move(column_names)});
}
}
@@ -285,13 +289,6 @@ Status AggFnEvaluator::execute_batch_add_selected(Block*
block, size_t offset,
return Status::OK();
}
-Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable&
buf,
- const size_t num_rows, Arena&
arena) {
- RETURN_IF_ERROR(_calc_argument_columns(block));
- _function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows,
arena);
- return Status::OK();
-}
-
Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block,
MutableColumnPtr& dst,
const size_t
num_rows, Arena& arena) {
RETURN_IF_ERROR(_calc_argument_columns(block));
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h
b/be/src/vec/exprs/vectorized_agg_fn.h
index 52555054d35..a5e75fda192 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -82,9 +82,6 @@ public:
Status execute_batch_add_selected(Block* block, size_t offset,
AggregateDataPtr* places,
Arena& arena);
- Status streaming_agg_serialize(Block* block, BufferWritable& buf, const
size_t num_rows,
- Arena& arena);
-
Status streaming_agg_serialize_to_column(Block* block, MutableColumnPtr&
dst,
const size_t num_rows, Arena&
arena);
diff --git a/be/test/vec/aggregate_functions/agg_function_test.h
b/be/test/vec/aggregate_functions/agg_function_test.h
index ea3f67d940a..47577d58a45 100644
--- a/be/test/vec/aggregate_functions/agg_function_test.h
+++ b/be/test/vec/aggregate_functions/agg_function_test.h
@@ -163,18 +163,6 @@ private:
std::vector<AggregateDataPtr> places {place};
agg_fn->function()->serialize_to_column(places, 0,
serialize_column, 1);
}
-
- {
- Arena arena;
- auto* place = reinterpret_cast<vectorized::AggregateDataPtr>(
- arena.alloc(agg_fn->function()->size_of_data()));
-
- agg_fn->create(place);
- Defer defer([&]() { agg_fn->destroy(place); });
- agg_fn->function()->deserialize_from_column(place,
*serialize_column, arena, 1);
-
- check_result(place);
- }
}
{ // streaming_agg_serialize_to_column
deserialize_and_merge_from_column_range
diff --git a/be/test/vec/aggregate_functions/agg_min_max_test.cpp
b/be/test/vec/aggregate_functions/agg_min_max_test.cpp
index 54ac10cd705..79467129f9c 100644
--- a/be/test/vec/aggregate_functions/agg_min_max_test.cpp
+++ b/be/test/vec/aggregate_functions/agg_min_max_test.cpp
@@ -112,12 +112,19 @@ TEST_P(AggMinMaxTest, min_max_decimal_test) {
agg_function->streaming_agg_serialize_to_column(column, dst,
agg_test_batch_size, arena);
std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data() *
agg_test_batch_size]);
- AggregateDataPtr places = memory2.get();
- agg_function->deserialize_from_column(places, *dst, arena,
agg_test_batch_size);
+ std::unique_ptr<char[]> memory2_tmp(
+ new char[agg_function->size_of_data() * agg_test_batch_size]);
+ std::vector<AggregateDataPtr> places(agg_test_batch_size);
+ for (size_t i = 0; i != agg_test_batch_size; ++i) {
+ places[i] = memory2.get() + agg_function->size_of_data() * i;
+ agg_function->create(places[i]);
+ }
+ agg_function->deserialize_and_merge_vec(places.data(), 0,
memory2_tmp.get(), dst.get(), arena,
+ agg_test_batch_size);
ColumnDecimal128V2 result(0, 9);
for (size_t i = 0; i != agg_test_batch_size; ++i) {
- agg_function->insert_result_into(places + agg_function->size_of_data()
* i, result);
+ agg_function->insert_result_into(places[i], result);
}
for (size_t i = 0; i != agg_test_batch_size; ++i) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 4ef012cb20a..75504d3b76e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -312,6 +312,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange";
public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan";
+ public static final String ENABLE_AGGREGATE_FUNCTION_NULL_V2 =
"enable_aggregate_function_null_v2";
public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD =
"enable_new_shuffle_hash_method";
@@ -1478,6 +1479,9 @@ public class SessionVariable implements Serializable,
Writable {
needForward = true)
private boolean enableParallelScan = true;
+ @VariableMgr.VarAttr(name = ENABLE_AGGREGATE_FUNCTION_NULL_V2, fuzzy =
true, needForward = true)
+ private boolean enableAggregateFunctionNullV2 = true;
+
@VariableMgr.VarAttr(name = OPTIMIZE_INDEX_SCAN_PARALLELISM,
needForward = true,
description = {"优化索引扫描时的 Scan 并行度,该优化目前只对 ann topn 查询生效",
@@ -5113,6 +5117,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setInvertedIndexCompatibleRead(invertedIndexCompatibleRead);
tResult.setCteMaxRecursionDepth(cteMaxRecursionDepth);
tResult.setEnableParallelScan(enableParallelScan);
+
tResult.setEnableAggregateFunctionNullV2(enableAggregateFunctionNullV2);
tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount);
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
tResult.setOptimizeIndexScanParallelism(optimizeIndexScanParallelism);
@@ -5513,6 +5518,10 @@ public class SessionVariable implements Serializable,
Writable {
return enableParallelScan;
}
+ public boolean getEnableAggregateFunctionNullV2() {
+ return enableAggregateFunctionNullV2;
+ }
+
public boolean enableParallelResultSink() {
return enableParallelResultSink;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index dbfc8f2de76..dd7c0b0c453 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -425,6 +425,7 @@ struct TQueryOptions {
184: optional i32 cte_max_recursion_depth;
185: optional bool enable_parquet_file_page_cache = true;
+ 186: optional bool enable_aggregate_function_null_v2 = false;
186: optional bool enable_streaming_agg_force_passthrough;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]