This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch tpc_preview6
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/tpc_preview6 by this push:
     new af225207951 support aggregate_function_null_v2
af225207951 is described below

commit af22520795162d0ec3adfb96be37dfcd626ed08a
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Thu Jan 22 19:48:33 2026 +0800

    support aggregate_function_null_v2
    
    update
    
    fix
    
    format
    
    update
    
    update
    
    update
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  10 -
 .../pipeline/exec/aggregation_source_operator.cpp  |   3 -
 be/src/runtime/runtime_state.h                     |   5 +
 .../vec/aggregate_functions/aggregate_function.h   |  58 +-
 .../aggregate_function_array_agg.h                 |  15 -
 .../aggregate_functions/aggregate_function_avg.h   |  16 +-
 .../aggregate_function_bitmap.h                    |  11 -
 .../aggregate_function_bitmap_agg.h                |  22 -
 .../aggregate_functions/aggregate_function_count.h |  24 +-
 .../aggregate_functions/aggregate_function_map.h   |  20 -
 .../aggregate_function_map_v2.h                    |  20 -
 .../aggregate_function_min_max.h                   |  19 +-
 .../aggregate_function_null_v2.h                   | 598 +++++++++++++++++++++
 .../aggregate_functions/aggregate_function_sum.h   |  12 +-
 .../aggregate_function_uniq_distribute_key.h       |  12 +-
 be/src/vec/aggregate_functions/helpers.h           |  93 +++-
 be/src/vec/exprs/vectorized_agg_fn.cpp             |  11 +-
 be/src/vec/exprs/vectorized_agg_fn.h               |   3 -
 .../vec/aggregate_functions/agg_function_test.h    |  12 -
 .../vec/aggregate_functions/agg_min_max_test.cpp   |  13 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   9 +
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 22 files changed, 724 insertions(+), 263 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 9239e13f7d4..db684d4c35f 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -302,9 +302,6 @@ Status 
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
                 int col_id = AggSharedState::get_slot_column_id(
                         Base::_shared_state->aggregate_evaluators[i]);
                 auto column = block->get_by_position(col_id).column;
-                if (column->is_nullable()) {
-                    column = 
((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
-                }
 
                 size_t buffer_size =
                         
Base::_shared_state->aggregate_evaluators[i]->function()->size_of_data() *
@@ -354,10 +351,6 @@ Status 
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
                                 Base::_shared_state->aggregate_evaluators[i]);
                     }
                     auto column = block->get_by_position(col_id).column;
-                    if (column->is_nullable()) {
-                        column = ((vectorized::ColumnNullable*)column.get())
-                                         ->get_nested_column_ptr();
-                    }
 
                     size_t buffer_size = 
Base::_shared_state->aggregate_evaluators[i]
                                                  ->function()
@@ -412,9 +405,6 @@ Status 
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
             int col_id = AggSharedState::get_slot_column_id(
                     Base::_shared_state->aggregate_evaluators[i]);
             auto column = block->get_by_position(col_id).column;
-            if (column->is_nullable()) {
-                column = 
((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
-            }
 
             SCOPED_TIMER(_deserialize_data_timer);
             Base::_shared_state->aggregate_evaluators[i]
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index b8ecd137016..846bfdf1c12 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -500,9 +500,6 @@ Status 
AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
     for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
         auto col_id = Base::_shared_state->probe_expr_ctxs.size() + i;
         auto column = block->get_by_position(col_id).column;
-        if (column->is_nullable()) {
-            column = 
((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
-        }
 
         size_t buffer_size =
                 
Base::_shared_state->aggregate_evaluators[i]->function()->size_of_data() * rows;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 86ccb003279..aa4f6f10b50 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -570,6 +570,11 @@ public:
         return _query_options.__isset.enable_parallel_scan && 
_query_options.enable_parallel_scan;
     }
 
+    bool enable_aggregate_function_null_v2() const {
+        return _query_options.__isset.enable_aggregate_function_null_v2 &&
+               _query_options.enable_aggregate_function_null_v2;
+    }
+
     bool is_read_csv_empty_line_as_null() const {
         return _query_options.__isset.read_csv_empty_line_as_null &&
                _query_options.read_csv_empty_line_as_null;
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h 
b/be/src/vec/aggregate_functions/aggregate_function.h
index cdcef2f248f..a0181210ac2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -46,6 +46,7 @@ class IDataType;
 struct AggregateFunctionAttr {
     bool is_window_function {false};
     bool is_foreach {false};
+    bool enable_aggregate_function_null_v2 {false};
     std::vector<std::string> column_names;
 };
 
@@ -131,12 +132,13 @@ public:
     virtual void merge(AggregateDataPtr __restrict place, 
ConstAggregateDataPtr rhs,
                        Arena&) const = 0;
 
-    virtual void merge_vec(const AggregateDataPtr* places, size_t offset, 
ConstAggregateDataPtr rhs,
-                           Arena&, const size_t num_rows) const = 0;
+    virtual void merge_vec(const AggregateDataPtr __restrict* __restrict 
places, size_t offset,
+                           ConstAggregateDataPtr __restrict rhs, Arena&,
+                           const size_t num_rows) const = 0;
 
     // same as merge_vec, but only call "merge" function when place is not 
nullptr
-    virtual void merge_vec_selected(const AggregateDataPtr* places, size_t 
offset,
-                                    ConstAggregateDataPtr rhs, Arena&,
+    virtual void merge_vec_selected(const AggregateDataPtr __restrict* 
__restrict places,
+                                    size_t offset, ConstAggregateDataPtr 
__restrict rhs, Arena&,
                                     const size_t num_rows) const = 0;
 
     /// Serializes state (to transmit it over the network, for example).
@@ -166,9 +168,6 @@ public:
                                                     AggregateDataPtr rhs, 
const IColumn* column,
                                                     Arena&, const size_t 
num_rows) const = 0;
 
-    virtual void deserialize_from_column(AggregateDataPtr places, const 
IColumn& column, Arena&,
-                                         size_t num_rows) const = 0;
-
     /// Deserializes state and merge it with current aggregation function.
     virtual void deserialize_and_merge(AggregateDataPtr __restrict place,
                                        AggregateDataPtr __restrict rhs, 
BufferReadable& buf,
@@ -178,8 +177,10 @@ public:
                                                          const IColumn& 
column, size_t begin,
                                                          size_t end, Arena&) 
const = 0;
 
-    virtual void deserialize_and_merge_from_column(AggregateDataPtr __restrict 
place,
-                                                   const IColumn& column, 
Arena&) const = 0;
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena& arena) const {
+        deserialize_and_merge_from_column_range(place, column, 0, 
column.size() - 1, arena);
+    }
 
     /// Inserts results into a column.
     // todo: Consider whether this passes a ConstAggregateDataPtr
@@ -218,9 +219,6 @@ public:
                                         Arena& arena, UInt8* use_null_result,
                                         UInt8* could_use_previous_result) 
const = 0;
 
-    virtual void streaming_agg_serialize(const IColumn** columns, 
BufferWritable& buf,
-                                         const size_t num_rows, Arena&) const 
= 0;
-
     virtual void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
                                                    const size_t num_rows, 
Arena&) const = 0;
 
@@ -309,6 +307,9 @@ public:
 
     void destroy_vec(AggregateDataPtr __restrict place,
                      const size_t num_rows) const noexcept override {
+        if (is_trivial()) {
+            return;
+        }
         const size_t size_of_data_ = size_of_data();
         const Derived* derived = assert_cast<const Derived*>(this);
         for (size_t i = 0; i != num_rows; ++i) {
@@ -419,8 +420,9 @@ public:
         serialize_vec(places, offset, writer, num_rows);
     }
 
-    void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf,
-                                 const size_t num_rows, Arena& arena) const 
override {
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena& 
arena) const override {
+        VectorBufferWriter buf(assert_cast<ColumnString&>(*dst));
         std::vector<char> place(size_of_data());
         const Derived* derived = assert_cast<const Derived*>(this);
         for (size_t i = 0; i != num_rows; ++i) {
@@ -432,12 +434,6 @@ public:
         }
     }
 
-    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
-                                           const size_t num_rows, Arena& 
arena) const override {
-        VectorBufferWriter writer(assert_cast<ColumnString&>(*dst));
-        streaming_agg_serialize(columns, writer, num_rows, arena);
-    }
-
     void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
                                          IColumn& to) const override {
         VectorBufferWriter writter(assert_cast<ColumnString&>(to));
@@ -516,13 +512,9 @@ public:
         derived->destroy_vec(rhs, num_rows);
     }
 
-    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena& arena,
-                                 size_t num_rows) const override {
-        deserialize_vec(places, assert_cast<const ColumnString*>(&column), 
arena, num_rows);
-    }
-
-    void merge_vec(const AggregateDataPtr* places, size_t offset, 
ConstAggregateDataPtr rhs,
-                   Arena& arena, const size_t num_rows) const override {
+    void merge_vec(const AggregateDataPtr __restrict* __restrict places, 
size_t offset,
+                   ConstAggregateDataPtr __restrict rhs, Arena& arena,
+                   const size_t num_rows) const override {
         const auto* derived = assert_cast<const Derived*>(this);
         const auto size_of_data = derived->size_of_data();
         for (size_t i = 0; i != num_rows; ++i) {
@@ -530,8 +522,8 @@ public:
         }
     }
 
-    void merge_vec_selected(const AggregateDataPtr* places, size_t offset,
-                            ConstAggregateDataPtr rhs, Arena& arena,
+    void merge_vec_selected(const AggregateDataPtr __restrict* __restrict 
places, size_t offset,
+                            ConstAggregateDataPtr __restrict rhs, Arena& arena,
                             const size_t num_rows) const override {
         const auto* derived = assert_cast<const Derived*>(this);
         const auto size_of_data = derived->size_of_data();
@@ -561,14 +553,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena& arena) const override {
-        if (column.empty()) {
-            return;
-        }
-        deserialize_and_merge_from_column_range(place, column, 0, 
column.size() - 1, arena);
-    }
-
     void deserialize_and_merge(AggregateDataPtr __restrict place, 
AggregateDataPtr __restrict rhs,
                                BufferReadable& buf, Arena& arena) const 
override {
         assert_cast<const Derived*, 
TypeCheckOnRelease::DISABLE>(this)->deserialize(rhs, buf,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_array_agg.h 
b/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
index 9831c140e51..439c18d5ac6 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_array_agg.h
@@ -324,14 +324,6 @@ public:
         this->data(place).insert_result_into(to);
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena& arena) const override {
-        const size_t num_rows = column.size();
-        for (size_t i = 0; i != num_rows; ++i) {
-            this->data(place).deserialize_and_merge(column, i);
-        }
-    }
-
     void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t 
offset,
                                    AggregateDataPtr rhs, const IColumn* 
column, Arena& arena,
                                    const size_t num_rows) const override {
@@ -340,13 +332,6 @@ public:
         }
     }
 
-    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena& arena,
-                                 size_t num_rows) const override {
-        for (size_t i = 0; i != num_rows; ++i) {
-            this->data(places).deserialize_and_merge(column, i);
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena& arena) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h 
b/be/src/vec/aggregate_functions/aggregate_function_avg.h
index ea2060b3f5b..8e2f3a8d67c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h
@@ -235,7 +235,7 @@ public:
     }
 
     void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena&,
-                                 size_t num_rows) const override {
+                                 size_t num_rows) const {
         auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
         DCHECK(col.size() >= num_rows) << "source column's size should greater 
than num_rows";
         auto* data = col.get_data().data();
@@ -268,20 +268,6 @@ public:
         }
     }
 
-    NO_SANITIZE_UNDEFINED void 
deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
-                                                                 const 
IColumn& column,
-                                                                 Arena&) const 
override {
-        auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
-        const size_t num_rows = column.size();
-        DCHECK(col.size() >= num_rows) << "source column's size should greater 
than num_rows";
-        auto* data = reinterpret_cast<const Data*>(col.get_data().data());
-
-        for (size_t i = 0; i != num_rows; ++i) {
-            this->data(place).sum += data[i].sum;
-            this->data(place).count += data[i].count;
-        }
-    }
-
     NO_SANITIZE_UNDEFINED void deserialize_and_merge_from_column_range(
             AggregateDataPtr __restrict place, const IColumn& column, size_t 
begin, size_t end,
             Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h 
b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
index a17f1a42ffb..18a1d0480c5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
@@ -187,17 +187,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena&) const override {
-        const auto& col = assert_cast<const ColumnBitmap&>(column);
-        const size_t num_rows = column.size();
-        const auto* data = col.get_data().data();
-
-        for (size_t i = 0; i != num_rows; ++i) {
-            this->data(place).merge(data[i]);
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h 
b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
index 473b557f70d..b0cf1c6aa8d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
@@ -146,17 +146,6 @@ public:
         }
     }
 
-    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena&,
-                                 size_t num_rows) const override {
-        auto& col = assert_cast<const ColumnBitmap&>(column);
-        DCHECK(col.size() >= num_rows) << "source column's size should greater 
than num_rows";
-        auto* src = col.get_data().data();
-        auto* data = &(this->data(places));
-        for (size_t i = 0; i != num_rows; ++i) {
-            data[i].value = src[i];
-        }
-    }
-
     void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                              MutableColumnPtr& dst, const size_t num_rows) 
const override {
         auto& col = assert_cast<ColumnBitmap&>(*dst);
@@ -167,17 +156,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena&) const override {
-        auto& col = assert_cast<const ColumnBitmap&>(column);
-        const size_t num_rows = column.size();
-        auto* data = col.get_data().data();
-
-        for (size_t i = 0; i != num_rows; ++i) {
-            this->data(place).value |= data[i];
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h 
b/be/src/vec/aggregate_functions/aggregate_function_count.h
index 2fc31b0360b..9e7192805d6 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count.h
@@ -88,7 +88,7 @@ public:
     }
 
     void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena&,
-                                 size_t num_rows) const override {
+                                 size_t num_rows) const {
         auto data = assert_cast<const 
ColumnFixedLengthObject&>(column).get_data().data();
         memcpy(places, data, sizeof(Data) * num_rows);
     }
@@ -119,16 +119,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena&) const override {
-        auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
-        const size_t num_rows = column.size();
-        auto* data = reinterpret_cast<const Data*>(col.get_data().data());
-        for (size_t i = 0; i != num_rows; ++i) {
-            AggregateFunctionCount::data(place).count += data[i].count;
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena&) const override {
@@ -243,7 +233,7 @@ public:
     }
 
     void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena&,
-                                 size_t num_rows) const override {
+                                 size_t num_rows) const {
         auto data = assert_cast<const 
ColumnFixedLengthObject&>(column).get_data().data();
         memcpy(places, data, sizeof(Data) * num_rows);
     }
@@ -275,16 +265,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena&) const override {
-        auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
-        const size_t num_rows = column.size();
-        auto* data = reinterpret_cast<const Data*>(col.get_data().data());
-        for (size_t i = 0; i != num_rows; ++i) {
-            AggregateFunctionCountNotNullUnary::data(place).count += 
data[i].count;
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_map.h 
b/be/src/vec/aggregate_functions/aggregate_function_map.h
index f30b65bbb41..5e162ef7f35 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_map.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_map.h
@@ -261,16 +261,6 @@ public:
         }
     }
 
-    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena&,
-                                 size_t num_rows) const override {
-        const auto& col = assert_cast<const ColumnMap&>(column);
-        auto* data = &(this->data(places));
-        for (size_t i = 0; i != num_rows; ++i) {
-            auto map = col[i].get<TYPE_MAP>();
-            data->add(map[0], map[1]);
-        }
-    }
-
     void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                              MutableColumnPtr& dst, const size_t num_rows) 
const override {
         for (size_t i = 0; i != num_rows; ++i) {
@@ -279,16 +269,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena&) const override {
-        auto& col = assert_cast<const ColumnMap&>(column);
-        const size_t num_rows = column.size();
-        for (size_t i = 0; i != num_rows; ++i) {
-            auto map = col[i].get<TYPE_MAP>();
-            this->data(place).add(map[0], map[1]);
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_map_v2.h 
b/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
index 0c806219dc3..a56c54b4291 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_map_v2.h
@@ -235,16 +235,6 @@ public:
         }
     }
 
-    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena&,
-                                 size_t num_rows) const override {
-        const auto& col = assert_cast<const ColumnMap&>(column);
-        auto* data = &(this->data(places));
-        for (size_t i = 0; i != num_rows; ++i) {
-            auto map = col[i].get<TYPE_MAP>();
-            data->add(map[0], map[1]);
-        }
-    }
-
     void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                              MutableColumnPtr& dst, const size_t num_rows) 
const override {
         for (size_t i = 0; i != num_rows; ++i) {
@@ -253,16 +243,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena&) const override {
-        const auto& col = assert_cast<const ColumnMap&>(column);
-        const size_t num_rows = column.size();
-        for (size_t i = 0; i != num_rows; ++i) {
-            auto map = col[i].get<TYPE_MAP>();
-            this->data(place).add(map[0], map[1]);
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h 
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index 835c2c0c440..7a5b43eca88 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -806,7 +806,7 @@ public:
     }
 
     void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena& arena,
-                                 size_t num_rows) const override {
+                                 size_t num_rows) const {
         if constexpr (Data::IsFixedLength) {
             const auto& col = assert_cast<const 
ColumnFixedLengthObject&>(column);
             auto* column_data = reinterpret_cast<const 
Data*>(col.get_data().data());
@@ -815,7 +815,8 @@ public:
                 data[i] = column_data[i];
             }
         } else {
-            Base::deserialize_from_column(places, column, arena, num_rows);
+            this->deserialize_vec(places, assert_cast<const 
ColumnString*>(&column), arena,
+                                  num_rows);
         }
     }
 
@@ -847,20 +848,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena& arena) const override {
-        if constexpr (Data::IsFixedLength) {
-            const auto& col = assert_cast<const 
ColumnFixedLengthObject&>(column);
-            auto* column_data = reinterpret_cast<const 
Data*>(col.get_data().data());
-            const size_t num_rows = column.size();
-            for (size_t i = 0; i != num_rows; ++i) {
-                this->data(place).change_if_better(column_data[i], arena);
-            }
-        } else {
-            Base::deserialize_and_merge_from_column(place, column, arena);
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena& arena) const override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null_v2.h 
b/be/src/vec/aggregate_functions/aggregate_function_null_v2.h
new file mode 100644
index 00000000000..62fe35dde4e
--- /dev/null
+++ b/be/src/vec/aggregate_functions/aggregate_function_null_v2.h
@@ -0,0 +1,598 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/AggregateFunctions/AggregateFunctionNull.h
+// and modified by Doris
+
+#pragma once
+
+#include <glog/logging.h>
+
+#include <memory>
+
+#include "common/logging.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/aggregate_function_distinct.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/string_buffer.hpp"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_nullable.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+template <typename NestFunction, bool result_is_nullable, typename Derived>
+class AggregateFunctionNullBaseInlineV2 : public 
IAggregateFunctionHelper<Derived> {
+protected:
+    std::unique_ptr<NestFunction> nested_function;
+    size_t prefix_size;
+    bool is_window_function = false;
+
+    AggregateDataPtr nested_place(AggregateDataPtr __restrict place) const 
noexcept {
+        return place + prefix_size;
+    }
+
+    ConstAggregateDataPtr nested_place(ConstAggregateDataPtr __restrict place) 
const noexcept {
+        return place + prefix_size;
+    }
+
+    static void init(AggregateDataPtr __restrict place, bool 
is_window_function) noexcept {
+        init_flag(place);
+        init_null_count(place, is_window_function);
+    }
+
+    static void init_flag(AggregateDataPtr __restrict place) noexcept {
+        if constexpr (result_is_nullable) {
+            place[0] = false;
+        }
+    }
+
+    static void set_flag(AggregateDataPtr __restrict place) noexcept {
+        if constexpr (result_is_nullable) {
+            place[0] = true;
+        }
+    }
+
+    static bool get_flag(ConstAggregateDataPtr __restrict place) noexcept {
+        return result_is_nullable ? place[0] : true;
+    }
+
+    static void init_null_count(AggregateDataPtr __restrict place,
+                                bool is_window_function) noexcept {
+        if (is_window_function && result_is_nullable) {
+            unaligned_store<int32_t>(place + 1, 0);
+        }
+    }
+
+    static void update_null_count(AggregateDataPtr __restrict place, bool 
incremental,
+                                  bool is_window_function) noexcept {
+        if (is_window_function && result_is_nullable) {
+            auto null_count = unaligned_load<int32_t>(place + 1);
+            incremental ? null_count++ : null_count--;
+            unaligned_store<int32_t>(place + 1, null_count);
+        }
+    }
+
+    static int32_t get_null_count(ConstAggregateDataPtr __restrict place,
+                                  bool is_window_function) noexcept {
+        int32_t num = 0;
+        if (is_window_function && result_is_nullable) {
+            num = unaligned_load<int32_t>(place + 1);
+        }
+        return num;
+    }
+
+public:
+    AggregateFunctionNullBaseInlineV2(IAggregateFunction* nested_function_,
+                                      const DataTypes& arguments, bool 
is_window_function_)
+            : IAggregateFunctionHelper<Derived>(arguments),
+              nested_function {assert_cast<NestFunction*>(nested_function_)},
+              is_window_function(is_window_function_) {
+        DCHECK(nested_function_ != nullptr);
+        if constexpr (result_is_nullable) {
+            if (this->is_window_function) {
+                // 
flag|---null_count----|-------padding-------|--nested_data----|
+                size_t nested_align = nested_function->align_of_data();
+                prefix_size = 1 + sizeof(int32_t);
+                if (prefix_size % nested_align != 0) {
+                    prefix_size += (nested_align - (prefix_size % 
nested_align));
+                }
+            } else {
+                prefix_size = nested_function->align_of_data();
+            }
+        } else {
+            prefix_size = 0;
+        }
+    }
+
+    MutableColumnPtr create_serialize_column() const override {
+        if constexpr (result_is_nullable) {
+            return 
ColumnNullable::create(nested_function->create_serialize_column(),
+                                          ColumnUInt8::create());
+        }
+        return nested_function->create_serialize_column();
+    }
+
+    DataTypePtr get_serialized_type() const override {
+        if constexpr (result_is_nullable) {
+            return make_nullable(nested_function->get_serialized_type());
+        }
+        return nested_function->get_serialized_type();
+    }
+
+    void set_query_context(QueryContext* ctx) override {
+        return nested_function->set_query_context(ctx);
+    }
+
+    bool is_blockable() const override { return 
nested_function->is_blockable(); }
+
+    void set_version(const int version_) override {
+        IAggregateFunctionHelper<Derived>::set_version(version_);
+        nested_function->set_version(version_);
+    }
+
+    String get_name() const override { return "NullableV2(" + 
nested_function->get_name() + ")"; }
+
+    DataTypePtr get_return_type() const override {
+        return result_is_nullable ? 
make_nullable(nested_function->get_return_type())
+                                  : nested_function->get_return_type();
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        init(place, this->is_window_function);
+        nested_function->create(nested_place(place));
+    }
+
+    void destroy(AggregateDataPtr __restrict place) const noexcept override {
+        nested_function->destroy(nested_place(place));
+    }
+    void reset(AggregateDataPtr place) const override {
+        init(place, this->is_window_function);
+        nested_function->reset(nested_place(place));
+    }
+
+    bool is_trivial() const override { return false; }
+
+    size_t size_of_data() const override { return prefix_size + 
nested_function->size_of_data(); }
+
+    size_t align_of_data() const override {
+        if (this->is_window_function && result_is_nullable) {
+            return std::max(nested_function->align_of_data(), 
alignof(int32_t));
+        } else {
+            return nested_function->align_of_data();
+        }
+    }
+
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+               Arena& arena) const override {
+        if (get_flag(rhs)) {
+            set_flag(place);
+            nested_function->merge(nested_place(place), nested_place(rhs), 
arena);
+        }
+    }
+
+    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
+        bool flag = get_flag(place);
+        if constexpr (result_is_nullable) {
+            buf.write_binary(flag);
+        }
+        if (flag) {
+            nested_function->serialize(nested_place(place), buf);
+        }
+    }
+
+    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+                     Arena& arena) const override {
+        bool flag = true;
+        if constexpr (result_is_nullable) {
+            buf.read_binary(flag);
+        }
+        if (flag) {
+            set_flag(place);
+            nested_function->deserialize(nested_place(place), buf, arena);
+        }
+    }
+
+    void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
+                             MutableColumnPtr& dst, const size_t num_rows) 
const override {
+        if constexpr (result_is_nullable) {
+            auto& nullable_col = assert_cast<ColumnNullable&>(*dst);
+            auto& nested_col = nullable_col.get_nested_column();
+            auto& null_map = nullable_col.get_null_map_data();
+            MutableColumnPtr nested_col_ptr = nested_col.assume_mutable();
+
+            null_map.resize(num_rows);
+            uint8_t* __restrict null_map_data = null_map.data();
+            for (size_t i = 0; i < num_rows; ++i) {
+                null_map_data[i] = !get_flag(places[i] + offset);
+            }
+            nested_function->serialize_to_column(places, offset + prefix_size, 
nested_col_ptr,
+                                                 num_rows);
+        } else {
+            nested_function->serialize_to_column(places, offset, dst, 
num_rows);
+        }
+    }
+
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena& 
arena) const override {
+        const auto* src_nullable_col = assert_cast<const 
ColumnNullable*>(columns[0]);
+        const auto* __restrict src_null_map_data = 
src_nullable_col->get_null_map_data().data();
+
+        size_t nested_size = nested_function->size_of_data();
+        std::vector<AggregateDataPtr> nested_places(num_rows);
+        std::vector<char> places_data(num_rows * nested_size);
+        for (size_t i = 0; i < num_rows; ++i) {
+            nested_places[i] = places_data.data() + i * nested_size;
+        }
+
+        if (!nested_function->is_trivial()) {
+            for (int i = 0; i < num_rows; ++i) {
+                try {
+                    nested_function->create(nested_places[i]);
+                } catch (...) {
+                    for (int j = 0; j < i; ++j) {
+                        nested_function->destroy(nested_places[j]);
+                    }
+                    throw;
+                }
+            }
+        }
+        Defer destroy_places = {[&]() {
+            if (!nested_function->is_trivial()) {
+                for (int i = 0; i < num_rows; ++i) {
+                    nested_function->destroy(nested_places[i]);
+                }
+            }
+        }};
+        const IColumn* src_nested_column =
+                src_nullable_col->get_nested_column().assume_mutable().get();
+        if (src_nullable_col->has_null()) {
+            for (size_t i = 0; i < num_rows; ++i) {
+                if (!src_null_map_data[i]) {
+                    nested_function->add(nested_places[i], &src_nested_column, 
i, arena);
+                }
+            }
+        } else {
+            nested_function->add_batch(num_rows, nested_places.data(), 0, 
&src_nested_column, arena,
+                                       false);
+        }
+
+        if constexpr (result_is_nullable) {
+            auto& dst_nullable_col = assert_cast<ColumnNullable&>(*dst);
+            MutableColumnPtr nested_col_ptr = 
dst_nullable_col.get_nested_column().assume_mutable();
+            dst_nullable_col.get_null_map_column().insert_range_from(
+                    src_nullable_col->get_null_map_column(), 0, num_rows);
+            nested_function->serialize_to_column(nested_places, 0, 
nested_col_ptr, num_rows);
+        } else {
+            nested_function->serialize_to_column(nested_places, 0, dst, 
num_rows);
+        }
+    }
+
+    void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
+                                         IColumn& to) const override {
+        if constexpr (result_is_nullable) {
+            auto& nullable_col = assert_cast<ColumnNullable&>(to);
+            auto& nested_col = nullable_col.get_nested_column();
+            auto& null_map = nullable_col.get_null_map_data();
+
+            bool flag = get_flag(place);
+            if (flag) {
+                
nested_function->serialize_without_key_to_column(nested_place(place), 
nested_col);
+                null_map.push_back(0);
+            } else {
+                nested_col.insert_default();
+                null_map.push_back(1);
+            }
+        } else {
+            
nested_function->serialize_without_key_to_column(nested_place(place), to);
+        }
+    }
+
+    void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t 
offset,
+                                   AggregateDataPtr rhs, const IColumn* 
column, Arena& arena,
+                                   const size_t num_rows) const override {
+        if constexpr (result_is_nullable) {
+            const auto& nullable_col = assert_cast<const 
ColumnNullable&>(*column);
+            const auto& nested_col = nullable_col.get_nested_column();
+            const auto* __restrict null_map_data = 
nullable_col.get_null_map_data().data();
+
+            for (size_t i = 0; i < num_rows; ++i) {
+                *(places[i] + offset) |= (!null_map_data[i]);
+            }
+            nested_function->deserialize_and_merge_vec(places, offset + 
prefix_size, rhs,
+                                                       &nested_col, arena, 
num_rows);
+        } else {
+            this->nested_function->deserialize_and_merge_vec(places, offset, 
rhs, column, arena,
+                                                             num_rows);
+        }
+    }
+
+    void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, 
size_t offset,
+                                            AggregateDataPtr rhs, const 
IColumn* column,
+                                            Arena& arena, const size_t 
num_rows) const override {
+        if constexpr (result_is_nullable) {
+            const auto& nullable_col = assert_cast<const 
ColumnNullable&>(*column);
+            const auto& nested_col = nullable_col.get_nested_column();
+            const auto* __restrict null_map_data = 
nullable_col.get_null_map_data().data();
+
+            for (size_t i = 0; i < num_rows; ++i) {
+                *(places[i] + offset) |= (!null_map_data[i]);
+            }
+            nested_function->deserialize_and_merge_vec_selected(places, offset 
+ prefix_size, rhs,
+                                                                &nested_col, 
arena, num_rows);
+        } else {
+            this->nested_function->deserialize_and_merge_vec_selected(places, 
offset, rhs, column,
+                                                                      arena, 
num_rows);
+        }
+    }
+
+    void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
+                                                 const IColumn& column, size_t 
begin, size_t end,
+                                                 Arena& arena) const override {
+        DCHECK(end <= column.size() && begin <= end)
+                << ", begin:" << begin << ", end:" << end << ", 
column.size():" << column.size();
+
+        if constexpr (result_is_nullable) {
+            const auto& nullable_col = assert_cast<const 
ColumnNullable&>(column);
+            const auto& nested_col = nullable_col.get_nested_column();
+            const auto& null_map = nullable_col.get_null_map_data();
+
+            for (size_t i = begin; i <= end; ++i) {
+                if (!null_map[i]) {
+                    set_flag(place);
+                    nested_function->deserialize_and_merge_from_column_range(
+                            nested_place(place), nested_col, i, i, arena);
+                }
+            }
+        } else {
+            nested_function->deserialize_and_merge_from_column_range(place, 
column, begin, end,
+                                                                     arena);
+        }
+    }
+
+    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
+        if constexpr (result_is_nullable) {
+            auto& to_concrete = assert_cast<ColumnNullable&>(to);
+            if (get_flag(place)) {
+                nested_function->insert_result_into(nested_place(place),
+                                                    
to_concrete.get_nested_column());
+                to_concrete.get_null_map_data().push_back(0);
+            } else {
+                to_concrete.insert_default();
+            }
+        } else {
+            nested_function->insert_result_into(nested_place(place), to);
+        }
+    }
+};
+
+template <typename NestFuction, bool result_is_nullable>
+class AggregateFunctionNullUnaryInlineV2 final
+        : public AggregateFunctionNullBaseInlineV2<
+                  NestFuction, result_is_nullable,
+                  AggregateFunctionNullUnaryInlineV2<NestFuction, 
result_is_nullable>> {
+public:
+    AggregateFunctionNullUnaryInlineV2(IAggregateFunction* nested_function_,
+                                       const DataTypes& arguments, bool 
is_window_function_)
+            : AggregateFunctionNullBaseInlineV2<
+                      NestFuction, result_is_nullable,
+                      AggregateFunctionNullUnaryInlineV2<NestFuction, 
result_is_nullable>>(
+                      nested_function_, arguments, is_window_function_) {}
+
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
+             Arena& arena) const override {
+        const auto* column =
+                assert_cast<const ColumnNullable*, 
TypeCheckOnRelease::DISABLE>(columns[0]);
+        if (!column->is_null_at(row_num)) {
+            this->set_flag(place);
+            const IColumn* nested_column = &column->get_nested_column();
+            this->nested_function->add(this->nested_place(place), 
&nested_column, row_num, arena);
+        } else {
+            this->update_null_count(place, true, this->is_window_function);
+        }
+    }
+
+    IAggregateFunction* transmit_to_stable() override {
+        auto f = AggregateFunctionNullBaseInlineV2<
+                         NestFuction, result_is_nullable,
+                         AggregateFunctionNullUnaryInlineV2<NestFuction, 
result_is_nullable>>::
+                         nested_function->transmit_to_stable();
+        if (!f) {
+            return nullptr;
+        }
+        return new AggregateFunctionNullUnaryInlineV2<
+                typename FunctionStableTransfer<NestFuction>::FunctionStable, 
result_is_nullable>(
+                f, IAggregateFunction::argument_types, 
this->is_window_function);
+    }
+
+    void add_batch(size_t batch_size, AggregateDataPtr* __restrict places, 
size_t place_offset,
+                   const IColumn** columns, Arena& arena, bool agg_many) const 
override {
+        const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+        const IColumn* nested_column = &column->get_nested_column();
+        if (column->has_null()) {
+            const auto* __restrict null_map_data = 
column->get_null_map_data().data();
+            for (int i = 0; i < batch_size; ++i) {
+                if (!null_map_data[i]) {
+                    AggregateDataPtr __restrict place = places[i] + 
place_offset;
+                    this->set_flag(place);
+                    this->nested_function->add(this->nested_place(place), 
&nested_column, i, arena);
+                }
+            }
+        } else {
+            if constexpr (result_is_nullable) {
+                for (int i = 0; i < batch_size; ++i) {
+                    AggregateDataPtr __restrict place = places[i] + 
place_offset;
+                    place[0] |= 1;
+                    this->nested_function->add(this->nested_place(place), 
&nested_column, i, arena);
+                }
+            } else {
+                this->nested_function->add_batch(batch_size, places, 
place_offset, &nested_column,
+                                                 arena, agg_many);
+            }
+        }
+    }
+
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena& arena) const override {
+        const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+        bool has_null = column->has_null();
+
+        if (has_null) {
+            for (size_t i = 0; i < batch_size; ++i) {
+                this->add(place, columns, i, arena);
+            }
+        } else {
+            this->set_flag(place);
+            const IColumn* nested_column = &column->get_nested_column();
+            this->nested_function->add_batch_single_place(batch_size, 
this->nested_place(place),
+                                                          &nested_column, 
arena);
+        }
+    }
+
+    void add_batch_range(size_t batch_begin, size_t batch_end, 
AggregateDataPtr place,
+                         const IColumn** columns, Arena& arena, bool has_null) 
override {
+        const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+
+        if (has_null) {
+            for (size_t i = batch_begin; i <= batch_end; ++i) {
+                this->add(place, columns, i, arena);
+            }
+        } else {
+            this->set_flag(place);
+            const IColumn* nested_column = &column->get_nested_column();
+            this->nested_function->add_batch_range(batch_begin, batch_end,
+                                                   this->nested_place(place), 
&nested_column, arena,
+                                                   false);
+        }
+    }
+
+    void add_range_single_place(int64_t partition_start, int64_t 
partition_end, int64_t frame_start,
+                                int64_t frame_end, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena& arena, UInt8* use_null_result,
+                                UInt8* could_use_previous_result) const 
override {
+        auto current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        auto current_frame_end = std::min<int64_t>(frame_end, partition_end);
+        if (current_frame_start >= current_frame_end) {
+            if (!*could_use_previous_result) {
+                this->init_flag(place);
+                *use_null_result = true;
+                return;
+            }
+        } else {
+            *use_null_result = false;
+            *could_use_previous_result = true;
+        }
+        const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+        bool has_null = column->has_null();
+        if (has_null) {
+            for (size_t i = current_frame_start; i < current_frame_end; ++i) {
+                this->add(place, columns, i, arena);
+            }
+        } else {
+            const IColumn* nested_column = &(column->get_nested_column());
+            this->set_flag(place);
+            this->nested_function->add_range_single_place(
+                    partition_start, partition_end, frame_start, frame_end,
+                    this->nested_place(place), &nested_column, arena, 
use_null_result,
+                    could_use_previous_result);
+        }
+    }
+
+    bool supported_incremental_mode() const override {
+        return this->nested_function->supported_incremental_mode();
+    }
+
+    void execute_function_with_incremental(int64_t partition_start, int64_t 
partition_end,
+                                           int64_t frame_start, int64_t 
frame_end,
+                                           AggregateDataPtr place, const 
IColumn** columns,
+                                           Arena& arena, bool previous_is_nul, 
bool end_is_nul,
+                                           bool has_null, UInt8* 
use_null_result,
+                                           UInt8* could_use_previous_result) 
const override {
+        int64_t current_frame_start = std::max<int64_t>(frame_start, 
partition_start);
+        int64_t current_frame_end = std::min<int64_t>(frame_end, 
partition_end);
+        if (current_frame_start >= current_frame_end) {
+            *use_null_result = true;
+            this->init_flag(place);
+            return;
+        }
+
+        DCHECK(columns[0]->is_nullable()) << columns[0]->get_name();
+        const auto* column = assert_cast<const ColumnNullable*>(columns[0]);
+        const IColumn* nested_column = &column->get_nested_column();
+
+        if (!column->has_null()) {
+            if (*could_use_previous_result) {
+                this->nested_function->execute_function_with_incremental(
+                        partition_start, partition_end, frame_start, frame_end,
+                        this->nested_place(place), &nested_column, arena, 
previous_is_nul,
+                        end_is_nul, false, use_null_result, 
could_use_previous_result);
+            } else {
+                this->nested_function->add_range_single_place(
+                        partition_start, partition_end, frame_start, frame_end,
+                        this->nested_place(place), &nested_column, arena, 
use_null_result,
+                        could_use_previous_result);
+            }
+            this->set_flag(place);
+            return;
+        }
+
+        const auto* __restrict null_map_data = 
column->get_null_map_data().data();
+        if (*could_use_previous_result) {
+            auto outcoming_pos = frame_start - 1;
+            auto incoming_pos = frame_end - 1;
+            bool is_previous_frame_start_null = false;
+            if (outcoming_pos >= partition_start && outcoming_pos < 
partition_end &&
+                null_map_data[outcoming_pos] == 1) {
+                is_previous_frame_start_null = true;
+                DCHECK_EQ(result_is_nullable, true);
+                DCHECK_EQ(this->is_window_function, true);
+                this->update_null_count(place, false, 
this->is_window_function);
+            }
+            bool is_current_frame_end_null = false;
+            if (incoming_pos >= partition_start && incoming_pos < 
partition_end &&
+                null_map_data[incoming_pos] == 1) {
+                is_current_frame_end_null = true;
+                DCHECK_EQ(result_is_nullable, true);
+                DCHECK_EQ(this->is_window_function, true);
+                this->update_null_count(place, true, this->is_window_function);
+            }
+            const IColumn* columns_tmp[2] {nested_column, 
&(*column->get_null_map_column_ptr())};
+            this->nested_function->execute_function_with_incremental(
+                    partition_start, partition_end, frame_start, frame_end,
+                    this->nested_place(place), columns_tmp, arena, 
is_previous_frame_start_null,
+                    is_current_frame_end_null, true, use_null_result, 
could_use_previous_result);
+            DCHECK_EQ(result_is_nullable, true);
+            DCHECK_EQ(this->is_window_function, true);
+            if (current_frame_end - current_frame_start ==
+                this->get_null_count(place, this->is_window_function)) {
+                this->init_flag(place);
+            } else {
+                this->set_flag(place);
+            }
+        } else {
+            this->add_range_single_place(partition_start, partition_end, 
frame_start, frame_end,
+                                         place, columns, arena, 
use_null_result,
+                                         could_use_previous_result);
+        }
+    }
+};
+
+} // namespace doris::vectorized
+
+#include "common/compile_check_end.h"
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h 
b/be/src/vec/aggregate_functions/aggregate_function_sum.h
index 62822288860..2054886b16d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h
@@ -134,7 +134,7 @@ public:
     }
 
     void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena&,
-                                 size_t num_rows) const override {
+                                 size_t num_rows) const {
         auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
         auto* data = col.get_data().data();
         memcpy(places, data, sizeof(Data) * num_rows);
@@ -168,16 +168,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena&) const override {
-        auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
-        const size_t num_rows = column.size();
-        auto* data = reinterpret_cast<const Data*>(col.get_data().data());
-        for (size_t i = 0; i != num_rows; ++i) {
-            this->data(place).sum += data[i].sum;
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena&) const override {
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h 
b/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
index 536a64af602..1c27c37b5ac 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq_distribute_key.h
@@ -165,7 +165,7 @@ public:
     }
 
     void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena&,
-                                 size_t num_rows) const override {
+                                 size_t num_rows) const {
         auto data = reinterpret_cast<const UInt64*>(
                 assert_cast<const 
ColumnFixedLengthObject&>(column).get_data().data());
         for (size_t i = 0; i != num_rows; ++i) {
@@ -199,16 +199,6 @@ public:
         }
     }
 
-    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
-                                           Arena&) const override {
-        auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
-        const size_t num_rows = column.size();
-        auto* data = reinterpret_cast<const UInt64*>(col.get_data().data());
-        for (size_t i = 0; i != num_rows; ++i) {
-            AggregateFunctionUniqDistributeKey::data(place).count += data[i];
-        }
-    }
-
     void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
                                                  const IColumn& column, size_t 
begin, size_t end,
                                                  Arena&) const override {
diff --git a/be/src/vec/aggregate_functions/helpers.h 
b/be/src/vec/aggregate_functions/helpers.h
index 2317925de7b..5762aa7ad4d 100644
--- a/be/src/vec/aggregate_functions/helpers.h
+++ b/be/src/vec/aggregate_functions/helpers.h
@@ -23,6 +23,7 @@
 #include "runtime/define_primitive_type.h"
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/aggregate_functions/aggregate_function_null.h"
+#include "vec/aggregate_functions/aggregate_function_null_v2.h"
 #include "vec/core/call_on_type_index.h"
 #include "vec/data_types/data_type.h"
 #include "vec/utils/template_helpers.hpp"
@@ -68,12 +69,13 @@
                                     decltype(&IAggregateFunctionHelper<        
                    \
                                              
FunctionTemplate>::serialize_without_key_to_column)>, \
                     "need to override serialize_without_key_to_column");       
                    \
-            static_assert(!std::is_same_v<                                     
                    \
-                                  
decltype(&FunctionTemplate::deserialize_and_merge_from_column),  \
-                                  decltype(&IAggregateFunctionHelper<          
                    \
-                                           
FunctionTemplate>::deserialize_and_merge_from_column)>, \
-                          "need to override "                                  
                    \
-                          "deserialize_and_merge_from_column");                
                    \
+            static_assert(                                                     
                    \
+                    !std::is_same_v<                                           
                    \
+                            
decltype(&FunctionTemplate::deserialize_and_merge_from_column_range),  \
+                            decltype(&IAggregateFunctionHelper<                
                    \
+                                     
FunctionTemplate>::deserialize_and_merge_from_column)>,       \
+                    "need to override "                                        
                    \
+                    "deserialize_and_merge_from_column");                      
                    \
         }                                                                      
                    \
     } while (false)
 
@@ -85,6 +87,11 @@ struct creator_without_type {
     using NullableT = std::conditional_t<multi_arguments, 
AggregateFunctionNullVariadicInline<T, f>,
                                          AggregateFunctionNullUnaryInline<T, 
f>>;
 
+    template <bool multi_arguments, bool f, typename T>
+    using NullableV2T =
+            std::conditional_t<multi_arguments, 
AggregateFunctionNullVariadicInline<T, f>,
+                               AggregateFunctionNullUnaryInlineV2<T, f>>;
+
     template <typename AggregateFunctionTemplate>
     static AggregateFunctionPtr creator(const std::string& name, const 
DataTypes& argument_types,
                                         const DataTypePtr& result_type,
@@ -142,9 +149,15 @@ struct creator_without_type {
         if (have_nullable(argument_types_)) {
             std::visit(
                     [&](auto multi_arguments, auto result_is_nullable) {
-                        result.reset(new NullableT<multi_arguments, 
result_is_nullable,
-                                                   AggregateFunctionTemplate>(
-                                result.release(), argument_types_, 
attr.is_window_function));
+                        if (attr.enable_aggregate_function_null_v2) {
+                            result.reset(new NullableV2T<multi_arguments, 
result_is_nullable,
+                                                         
AggregateFunctionTemplate>(
+                                    result.release(), argument_types_, 
attr.is_window_function));
+                        } else {
+                            result.reset(new NullableT<multi_arguments, 
result_is_nullable,
+                                                       
AggregateFunctionTemplate>(
+                                    result.release(), argument_types_, 
attr.is_window_function));
+                        }
                     },
                     make_bool_variant(argument_types_.size() > 1),
                     make_bool_variant(result_is_nullable));
@@ -166,11 +179,21 @@ struct creator_without_type {
                 std::forward<TArgs>(args)..., 
remove_nullable(argument_types_)));
         if (have_nullable(argument_types_)) {
             if (argument_types_.size() > 1) {
-                result.reset(new NullableT<true, false, 
AggregateFunctionTemplate>(
-                        result.release(), argument_types_, 
attr.is_window_function));
+                if (attr.enable_aggregate_function_null_v2) {
+                    result.reset(new NullableV2T<true, false, 
AggregateFunctionTemplate>(
+                            result.release(), argument_types_, 
attr.is_window_function));
+                } else {
+                    result.reset(new NullableT<true, false, 
AggregateFunctionTemplate>(
+                            result.release(), argument_types_, 
attr.is_window_function));
+                }
             } else {
-                result.reset(new NullableT<false, false, 
AggregateFunctionTemplate>(
-                        result.release(), argument_types_, 
attr.is_window_function));
+                if (attr.enable_aggregate_function_null_v2) {
+                    result.reset(new NullableV2T<false, false, 
AggregateFunctionTemplate>(
+                            result.release(), argument_types_, 
attr.is_window_function));
+                } else {
+                    result.reset(new NullableT<false, false, 
AggregateFunctionTemplate>(
+                            result.release(), argument_types_, 
attr.is_window_function));
+                }
             }
         }
 
@@ -192,10 +215,15 @@ struct creator_without_type {
         if (have_nullable(argument_types_)) {
             std::visit(
                     [&](auto result_is_nullable) {
-                        result.reset(
-                                new NullableT<true, result_is_nullable, 
AggregateFunctionTemplate>(
-                                        result.release(), argument_types_,
-                                        attr.is_window_function));
+                        if (attr.enable_aggregate_function_null_v2) {
+                            result.reset(new NullableV2T<true, 
result_is_nullable,
+                                                         
AggregateFunctionTemplate>(
+                                    result.release(), argument_types_, 
attr.is_window_function));
+                        } else {
+                            result.reset(new NullableT<true, 
result_is_nullable,
+                                                       
AggregateFunctionTemplate>(
+                                    result.release(), argument_types_, 
attr.is_window_function));
+                        }
                     },
                     make_bool_variant(result_is_nullable));
         }
@@ -220,8 +248,13 @@ struct creator_without_type {
         std::unique_ptr<IAggregateFunction> 
result(std::make_unique<AggregateFunctionTemplate>(
                 std::forward<TArgs>(args)..., 
remove_nullable(argument_types_)));
         if (have_nullable(argument_types_)) {
-            result.reset(new NullableT<true, false, AggregateFunctionTemplate>(
-                    result.release(), argument_types_, 
attr.is_window_function));
+            if (attr.enable_aggregate_function_null_v2) {
+                result.reset(new NullableV2T<true, false, 
AggregateFunctionTemplate>(
+                        result.release(), argument_types_, 
attr.is_window_function));
+            } else {
+                result.reset(new NullableT<true, false, 
AggregateFunctionTemplate>(
+                        result.release(), argument_types_, 
attr.is_window_function));
+            }
         }
         CHECK_AGG_FUNCTION_SERIALIZED_TYPE(AggregateFunctionTemplate);
         return AggregateFunctionPtr(result.release());
@@ -241,10 +274,15 @@ struct creator_without_type {
         if (have_nullable(argument_types_)) {
             std::visit(
                     [&](auto result_is_nullable) {
-                        result.reset(
-                                new NullableT<false, result_is_nullable, 
AggregateFunctionTemplate>(
-                                        result.release(), argument_types_,
-                                        attr.is_window_function));
+                        if (attr.enable_aggregate_function_null_v2) {
+                            result.reset(new NullableV2T<false, 
result_is_nullable,
+                                                         
AggregateFunctionTemplate>(
+                                    result.release(), argument_types_, 
attr.is_window_function));
+                        } else {
+                            result.reset(new NullableT<false, 
result_is_nullable,
+                                                       
AggregateFunctionTemplate>(
+                                    result.release(), argument_types_, 
attr.is_window_function));
+                        }
                     },
                     make_bool_variant(result_is_nullable));
         }
@@ -268,8 +306,13 @@ struct creator_without_type {
         std::unique_ptr<IAggregateFunction> 
result(std::make_unique<AggregateFunctionTemplate>(
                 std::forward<TArgs>(args)..., 
remove_nullable(argument_types_)));
         if (have_nullable(argument_types_)) {
-            result.reset(new NullableT<false, false, 
AggregateFunctionTemplate>(
-                    result.release(), argument_types_, 
attr.is_window_function));
+            if (attr.enable_aggregate_function_null_v2) {
+                result.reset(new NullableV2T<false, false, 
AggregateFunctionTemplate>(
+                        result.release(), argument_types_, 
attr.is_window_function));
+            } else {
+                result.reset(new NullableT<false, false, 
AggregateFunctionTemplate>(
+                        result.release(), argument_types_, 
attr.is_window_function));
+            }
         }
         CHECK_AGG_FUNCTION_SERIALIZED_TYPE(AggregateFunctionTemplate);
         return AggregateFunctionPtr(result.release());
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp 
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 7da847d8d63..22ea6ccff4d 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -217,6 +217,8 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const 
RowDescriptor& desc,
                     state->be_exec_version(),
                     {.is_window_function = _is_window_function,
                      .is_foreach = is_foreach,
+                     .enable_aggregate_function_null_v2 =
+                             state->enable_aggregate_function_null_v2(),
                      .column_names = std::move(column_names)});
         } else {
             _function = AggregateFunctionSimpleFactory::instance().get(
@@ -224,6 +226,8 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const 
RowDescriptor& desc,
                     state->be_exec_version(),
                     {.is_window_function = _is_window_function,
                      .is_foreach = is_foreach,
+                     .enable_aggregate_function_null_v2 =
+                             state->enable_aggregate_function_null_v2(),
                      .column_names = std::move(column_names)});
         }
     }
@@ -285,13 +289,6 @@ Status AggFnEvaluator::execute_batch_add_selected(Block* 
block, size_t offset,
     return Status::OK();
 }
 
-Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& 
buf,
-                                               const size_t num_rows, Arena& 
arena) {
-    RETURN_IF_ERROR(_calc_argument_columns(block));
-    _function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows, 
arena);
-    return Status::OK();
-}
-
 Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, 
MutableColumnPtr& dst,
                                                          const size_t 
num_rows, Arena& arena) {
     RETURN_IF_ERROR(_calc_argument_columns(block));
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h 
b/be/src/vec/exprs/vectorized_agg_fn.h
index 52555054d35..a5e75fda192 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -82,9 +82,6 @@ public:
     Status execute_batch_add_selected(Block* block, size_t offset, 
AggregateDataPtr* places,
                                       Arena& arena);
 
-    Status streaming_agg_serialize(Block* block, BufferWritable& buf, const 
size_t num_rows,
-                                   Arena& arena);
-
     Status streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& 
dst,
                                              const size_t num_rows, Arena& 
arena);
 
diff --git a/be/test/vec/aggregate_functions/agg_function_test.h 
b/be/test/vec/aggregate_functions/agg_function_test.h
index ea3f67d940a..47577d58a45 100644
--- a/be/test/vec/aggregate_functions/agg_function_test.h
+++ b/be/test/vec/aggregate_functions/agg_function_test.h
@@ -163,18 +163,6 @@ private:
                 std::vector<AggregateDataPtr> places {place};
                 agg_fn->function()->serialize_to_column(places, 0, 
serialize_column, 1);
             }
-
-            {
-                Arena arena;
-                auto* place = reinterpret_cast<vectorized::AggregateDataPtr>(
-                        arena.alloc(agg_fn->function()->size_of_data()));
-
-                agg_fn->create(place);
-                Defer defer([&]() { agg_fn->destroy(place); });
-                agg_fn->function()->deserialize_from_column(place, 
*serialize_column, arena, 1);
-
-                check_result(place);
-            }
         }
 
         { // streaming_agg_serialize_to_column 
deserialize_and_merge_from_column_range
diff --git a/be/test/vec/aggregate_functions/agg_min_max_test.cpp 
b/be/test/vec/aggregate_functions/agg_min_max_test.cpp
index 54ac10cd705..79467129f9c 100644
--- a/be/test/vec/aggregate_functions/agg_min_max_test.cpp
+++ b/be/test/vec/aggregate_functions/agg_min_max_test.cpp
@@ -112,12 +112,19 @@ TEST_P(AggMinMaxTest, min_max_decimal_test) {
     agg_function->streaming_agg_serialize_to_column(column, dst, 
agg_test_batch_size, arena);
 
     std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data() * 
agg_test_batch_size]);
-    AggregateDataPtr places = memory2.get();
-    agg_function->deserialize_from_column(places, *dst, arena, 
agg_test_batch_size);
+    std::unique_ptr<char[]> memory2_tmp(
+            new char[agg_function->size_of_data() * agg_test_batch_size]);
+    std::vector<AggregateDataPtr> places(agg_test_batch_size);
+    for (size_t i = 0; i != agg_test_batch_size; ++i) {
+        places[i] = memory2.get() + agg_function->size_of_data() * i;
+        agg_function->create(places[i]);
+    }
+    agg_function->deserialize_and_merge_vec(places.data(), 0, 
memory2_tmp.get(), dst.get(), arena,
+                                            agg_test_batch_size);
 
     ColumnDecimal128V2 result(0, 9);
     for (size_t i = 0; i != agg_test_batch_size; ++i) {
-        agg_function->insert_result_into(places + agg_function->size_of_data() 
* i, result);
+        agg_function->insert_result_into(places[i], result);
     }
 
     for (size_t i = 0; i != agg_test_batch_size; ++i) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 4ef012cb20a..75504d3b76e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -312,6 +312,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange";
 
     public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan";
+    public static final String ENABLE_AGGREGATE_FUNCTION_NULL_V2 = 
"enable_aggregate_function_null_v2";
 
     public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD = 
"enable_new_shuffle_hash_method";
 
@@ -1478,6 +1479,9 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true)
     private boolean enableParallelScan = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_AGGREGATE_FUNCTION_NULL_V2, fuzzy = 
true, needForward = true)
+    private boolean enableAggregateFunctionNullV2 = true;
+
     @VariableMgr.VarAttr(name = OPTIMIZE_INDEX_SCAN_PARALLELISM,
             needForward = true,
             description = {"优化索引扫描时的 Scan 并行度,该优化目前只对 ann topn 查询生效",
@@ -5113,6 +5117,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setInvertedIndexCompatibleRead(invertedIndexCompatibleRead);
         tResult.setCteMaxRecursionDepth(cteMaxRecursionDepth);
         tResult.setEnableParallelScan(enableParallelScan);
+        
tResult.setEnableAggregateFunctionNullV2(enableAggregateFunctionNullV2);
         tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount);
         
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
         tResult.setOptimizeIndexScanParallelism(optimizeIndexScanParallelism);
@@ -5513,6 +5518,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return enableParallelScan;
     }
 
+    public boolean getEnableAggregateFunctionNullV2() {
+        return enableAggregateFunctionNullV2;
+    }
+
     public boolean enableParallelResultSink() {
         return enableParallelResultSink;
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index dbfc8f2de76..dd7c0b0c453 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -425,6 +425,7 @@ struct TQueryOptions {
   184: optional i32 cte_max_recursion_depth;
 
   185: optional bool enable_parquet_file_page_cache = true;
+  186: optional bool enable_aggregate_function_null_v2 = false;
 
   186: optional bool enable_streaming_agg_force_passthrough;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to