This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 86ef0069ea8 [Feature](function) support group concat with distinct and 
order by (#38851)
86ef0069ea8 is described below

commit 86ef0069ea85ee32f5d73317f86ce76f33974b11
Author: Pxl <pxl...@qq.com>
AuthorDate: Mon Aug 5 15:44:51 2024 +0800

    [Feature](function) support group concat with distinct and order by (#38851)
    
    pick from #38744 and #38776
---
 .../vec/aggregate_functions/aggregate_function.h   |  11 +-
 .../aggregate_function_distinct.cpp                |  25 +--
 .../aggregate_function_distinct.h                  | 206 +++++++++++++++------
 .../aggregate_function_foreach.h                   |   2 -
 .../aggregate_functions/aggregate_function_null.h  |   2 -
 .../aggregate_function_simple_factory.cpp          |   1 -
 .../aggregate_functions/aggregate_function_sort.h  |  13 +-
 7 files changed, 173 insertions(+), 87 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function.h 
b/be/src/vec/aggregate_functions/aggregate_function.h
index c74e22bdbcd..74700dff17f 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -43,6 +43,8 @@ class AggregateFunctionBitmapCount;
 template <typename Op>
 class AggregateFunctionBitmapOp;
 struct AggregateFunctionBitmapUnionOp;
+class IAggregateFunction;
+using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
 
 using DataTypePtr = std::shared_ptr<const IDataType>;
 using DataTypes = std::vector<DataTypePtr>;
@@ -178,11 +180,6 @@ public:
                                         const size_t offset, IColumn& to,
                                         const size_t num_rows) const = 0;
 
-    /** Returns true for aggregate functions of type -State.
-      * They are executed as other aggregate functions, but not finalized 
(return an aggregation state that can be combined with another).
-      */
-    virtual bool is_state() const { return false; }
-
     /** Contains a loop with calls to "add" function. You can collect 
arguments into array "places"
       *  and do a single call to "add_batch" for devirtualization and inlining.
       */
@@ -223,6 +220,8 @@ public:
 
     virtual void set_version(const int version_) { version = version_; }
 
+    virtual AggregateFunctionPtr transmit_to_stable() { return nullptr; }
+
 protected:
     DataTypes argument_types;
     int version {};
@@ -519,8 +518,6 @@ public:
     }
 };
 
-using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
-
 class AggregateFunctionGuard {
 public:
     using AggregateData = std::remove_pointer_t<AggregateDataPtr>;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
index 5b2269a27d9..4773a620e0a 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
@@ -29,6 +29,16 @@
 
 namespace doris::vectorized {
 
+template <typename T>
+struct Reducer {
+    template <bool stable>
+    using Output = AggregateFunctionDistinctSingleNumericData<T, stable>;
+    using AggregateFunctionDistinctNormal = AggregateFunctionDistinct<Output, 
false>;
+};
+
+template <typename T>
+using AggregateFunctionDistinctNumeric = 
Reducer<T>::AggregateFunctionDistinctNormal;
+
 class AggregateFunctionCombinatorDistinct final : public 
IAggregateFunctionCombinator {
 public:
     String get_name() const override { return "Distinct"; }
@@ -51,22 +61,15 @@ public:
 
         if (arguments.size() == 1) {
             AggregateFunctionPtr res(
-                    
creator_with_numeric_type::create<AggregateFunctionDistinct,
-                                                      
AggregateFunctionDistinctSingleNumericData>(
+                    
creator_with_numeric_type::create<AggregateFunctionDistinctNumeric>(
                             arguments, result_is_nullable, nested_function));
             if (res) {
                 return res;
             }
 
-            if 
(arguments[0]->is_value_unambiguously_represented_in_contiguous_memory_region())
 {
-                res = creator_without_type::create<AggregateFunctionDistinct<
-                        AggregateFunctionDistinctSingleGenericData<true>>>(
-                        arguments, result_is_nullable, nested_function);
-            } else {
-                res = creator_without_type::create<AggregateFunctionDistinct<
-                        AggregateFunctionDistinctSingleGenericData<false>>>(
-                        arguments, result_is_nullable, nested_function);
-            }
+            res = creator_without_type::create<
+                    
AggregateFunctionDistinct<AggregateFunctionDistinctSingleGenericData>>(
+                    arguments, result_is_nullable, nested_function);
             return res;
         }
         return creator_without_type::create<
diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h 
b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
index c0c7a5b66dd..4f42e8509f2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
@@ -28,6 +28,8 @@
 #include <memory>
 #include <new>
 #include <string>
+#include <type_traits>
+#include <utility>
 #include <vector>
 
 #include "vec/aggregate_functions/aggregate_function.h"
@@ -54,105 +56,170 @@ struct DefaultHash;
 
 namespace doris::vectorized {
 
-template <typename T>
+template <typename T, bool stable>
 struct AggregateFunctionDistinctSingleNumericData {
     /// When creating, the hash table must be small.
-    using Set = HashSetWithStackMemory<T, DefaultHash<T>, 4>;
-    using Self = AggregateFunctionDistinctSingleNumericData<T>;
-    Set set;
+    using Container = std::conditional_t<stable, phmap::flat_hash_map<T, 
uint32_t>,
+                                         HashSetWithStackMemory<T, 
DefaultHash<T>, 4>>;
+    using Self = AggregateFunctionDistinctSingleNumericData<T, stable>;
+    Container data;
 
     void add(const IColumn** columns, size_t /* columns_num */, size_t 
row_num, Arena*) {
         const auto& vec = assert_cast<const 
ColumnVector<T>&>(*columns[0]).get_data();
-        set.insert(vec[row_num]);
+        if constexpr (stable) {
+            data.emplace(vec[row_num], data.size());
+        } else {
+            data.insert(vec[row_num]);
+        }
     }
 
-    void merge(const Self& rhs, Arena*) { set.merge(rhs.set); }
+    void merge(const Self& rhs, Arena*) {
+        DCHECK(!stable);
+        if constexpr (!stable) {
+            data.merge(rhs.data);
+        }
+    }
 
-    void serialize(BufferWritable& buf) const { set.write(buf); }
+    void serialize(BufferWritable& buf) const {
+        DCHECK(!stable);
+        if constexpr (!stable) {
+            data.write(buf);
+        }
+    }
 
-    void deserialize(BufferReadable& buf, Arena*) { set.read(buf); }
+    void deserialize(BufferReadable& buf, Arena*) {
+        DCHECK(!stable);
+        if constexpr (!stable) {
+            data.read(buf);
+        }
+    }
 
     MutableColumns get_arguments(const DataTypes& argument_types) const {
         MutableColumns argument_columns;
         argument_columns.emplace_back(argument_types[0]->create_column());
-        for (const auto& elem : set) {
-            argument_columns[0]->insert(elem.get_value());
+
+        if constexpr (stable) {
+            argument_columns[0]->resize(data.size());
+            auto ptr = 
(T*)const_cast<char*>(argument_columns[0]->get_raw_data().data);
+            for (auto it : data) {
+                ptr[it.second] = it.first;
+            }
+        } else {
+            for (const auto& elem : data) {
+                argument_columns[0]->insert(elem.get_value());
+            }
         }
 
         return argument_columns;
     }
 };
 
+template <bool stable>
 struct AggregateFunctionDistinctGenericData {
     /// When creating, the hash table must be small.
-    using Set = HashSetWithStackMemory<StringRef, StringRefHash, 4>;
+    using Container = std::conditional_t<stable, 
phmap::flat_hash_map<StringRef, uint32_t>,
+                                         HashSetWithStackMemory<StringRef, 
StringRefHash, 4>>;
     using Self = AggregateFunctionDistinctGenericData;
-    Set set;
+    Container data;
 
     void merge(const Self& rhs, Arena* arena) {
-        Set::LookupResult it;
-        bool inserted;
-        for (const auto& elem : rhs.set) {
-            StringRef key = elem.get_value();
-            key.data = arena->insert(key.data, key.size);
-            set.emplace(key, it, inserted);
+        DCHECK(!stable);
+        if constexpr (!stable) {
+            typename Container::LookupResult it;
+            bool inserted;
+            for (const auto& elem : rhs.data) {
+                StringRef key = elem.get_value();
+                key.data = arena->insert(key.data, key.size);
+                data.emplace(key, it, inserted);
+            }
         }
     }
 
     void serialize(BufferWritable& buf) const {
-        write_var_uint(set.size(), buf);
-        for (const auto& elem : set) {
-            write_string_binary(elem.get_value(), buf);
+        DCHECK(!stable);
+        if constexpr (!stable) {
+            write_var_uint(data.size(), buf);
+            for (const auto& elem : data) {
+                write_string_binary(elem.get_value(), buf);
+            }
         }
     }
 
     void deserialize(BufferReadable& buf, Arena* arena) {
-        UInt64 size;
-        read_var_uint(size, buf);
-
-        StringRef ref;
-        for (size_t i = 0; i < size; ++i) {
-            read_string_binary(ref, buf);
-            set.insert(ref);
+        DCHECK(!stable);
+        if constexpr (!stable) {
+            UInt64 size;
+            read_var_uint(size, buf);
+
+            StringRef ref;
+            for (size_t i = 0; i < size; ++i) {
+                read_string_binary(ref, buf);
+                data.insert(ref);
+            }
         }
     }
 };
 
-template <bool is_plain_column>
-struct AggregateFunctionDistinctSingleGenericData : public 
AggregateFunctionDistinctGenericData {
+template <bool stable>
+struct AggregateFunctionDistinctSingleGenericData
+        : public AggregateFunctionDistinctGenericData<stable> {
+    using Base = AggregateFunctionDistinctGenericData<stable>;
+    using Base::data;
     void add(const IColumn** columns, size_t /* columns_num */, size_t 
row_num, Arena* arena) {
-        Set::LookupResult it;
-        bool inserted;
         auto key = columns[0]->get_data_at(row_num);
         key.data = arena->insert(key.data, key.size);
-        set.emplace(key, it, inserted);
+
+        if constexpr (stable) {
+            data.emplace(key, data.size());
+        } else {
+            typename Base::Container::LookupResult it;
+            bool inserted;
+            data.emplace(key, it, inserted);
+        }
     }
 
     MutableColumns get_arguments(const DataTypes& argument_types) const {
         MutableColumns argument_columns;
         argument_columns.emplace_back(argument_types[0]->create_column());
-        for (const auto& elem : set) {
-            argument_columns[0]->insert_data(elem.get_value().data, 
elem.get_value().size);
+        if constexpr (stable) {
+            std::vector<StringRef> tmp(data.size());
+            for (auto it : data) {
+                tmp[it.second] = it.first;
+            }
+            for (int i = 0; i < data.size(); i++) {
+                argument_columns[0]->insert_data(tmp[i].data, tmp[i].size);
+            }
+        } else {
+            for (const auto& elem : data) {
+                argument_columns[0]->insert_data(elem.get_value().data, 
elem.get_value().size);
+            }
         }
 
         return argument_columns;
     }
 };
 
-struct AggregateFunctionDistinctMultipleGenericData : public 
AggregateFunctionDistinctGenericData {
+template <bool stable>
+struct AggregateFunctionDistinctMultipleGenericData
+        : public AggregateFunctionDistinctGenericData<stable> {
+    using Base = AggregateFunctionDistinctGenericData<stable>;
+    using Base::data;
     void add(const IColumn** columns, size_t columns_num, size_t row_num, 
Arena* arena) {
         const char* begin = nullptr;
-        StringRef value(begin, 0);
+        StringRef key(begin, 0);
         for (size_t i = 0; i < columns_num; ++i) {
             auto cur_ref = columns[i]->serialize_value_into_arena(row_num, 
*arena, begin);
-            value.data = cur_ref.data - value.size;
-            value.size += cur_ref.size;
+            key.data = cur_ref.data - key.size;
+            key.size += cur_ref.size;
         }
 
-        Set::LookupResult it;
-        bool inserted;
-        value.data = arena->insert(value.data, value.size);
-        set.emplace(value, it, inserted);
+        if constexpr (stable) {
+            data.emplace(key, data.size());
+        } else {
+            typename Base::Container::LookupResult it;
+            bool inserted;
+            data.emplace(key, it, inserted);
+        }
     }
 
     MutableColumns get_arguments(const DataTypes& argument_types) const {
@@ -161,10 +228,23 @@ struct AggregateFunctionDistinctMultipleGenericData : 
public AggregateFunctionDi
             argument_columns[i] = argument_types[i]->create_column();
         }
 
-        for (const auto& elem : set) {
-            const char* begin = elem.get_value().data;
-            for (auto& column : argument_columns) {
-                begin = column->deserialize_and_insert_from_arena(begin);
+        if constexpr (stable) {
+            std::vector<StringRef> tmp(data.size());
+            for (auto it : data) {
+                tmp[it.second] = it.first;
+            }
+            for (int i = 0; i < data.size(); i++) {
+                const char* begin = tmp[i].data;
+                for (auto& column : argument_columns) {
+                    begin = column->deserialize_and_insert_from_arena(begin);
+                }
+            }
+        } else {
+            for (const auto& elem : data) {
+                const char* begin = elem.get_value().data;
+                for (auto& column : argument_columns) {
+                    begin = column->deserialize_and_insert_from_arena(begin);
+                }
             }
         }
 
@@ -175,9 +255,10 @@ struct AggregateFunctionDistinctMultipleGenericData : 
public AggregateFunctionDi
 /** Adaptor for aggregate functions.
   * Adding -Distinct suffix to aggregate function
 **/
-template <typename Data>
+template <template <bool stable> typename Data, bool stable = false>
 class AggregateFunctionDistinct
-        : public IAggregateFunctionDataHelper<Data, 
AggregateFunctionDistinct<Data>> {
+        : public IAggregateFunctionDataHelper<Data<stable>,
+                                              AggregateFunctionDistinct<Data, 
stable>> {
 private:
     size_t prefix_size;
     AggregateFunctionPtr nested_func;
@@ -193,12 +274,13 @@ private:
 
 public:
     AggregateFunctionDistinct(AggregateFunctionPtr nested_func_, const 
DataTypes& arguments)
-            : IAggregateFunctionDataHelper<Data, 
AggregateFunctionDistinct>(arguments),
-              nested_func(nested_func_),
+            : IAggregateFunctionDataHelper<Data<stable>, 
AggregateFunctionDistinct<Data, stable>>(
+                      arguments),
+              nested_func(std::move(nested_func_)),
               arguments_num(arguments.size()) {
         size_t nested_size = nested_func->align_of_data();
         CHECK_GT(nested_size, 0);
-        prefix_size = (sizeof(Data) + nested_size - 1) / nested_size * 
nested_size;
+        prefix_size = (sizeof(Data<stable>) + nested_size - 1) / nested_size * 
nested_size;
     }
 
     void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
@@ -221,7 +303,7 @@ public:
     }
 
     void insert_result_into(ConstAggregateDataPtr targetplace, IColumn& to) 
const override {
-        auto place = const_cast<AggregateDataPtr>(targetplace);
+        auto* place = const_cast<AggregateDataPtr>(targetplace);
         auto arguments = this->data(place).get_arguments(this->argument_types);
         ColumnRawPtrs arguments_raw(arguments.size());
         for (size_t i = 0; i < arguments.size(); ++i) {
@@ -229,11 +311,9 @@ public:
         }
 
         assert(!arguments.empty());
-        // nested_func->add_batch_single_place(arguments[0]->size(), 
get_nested_place(place), arguments_raw.data(), arena);
-        // nested_func->insert_result_into(get_nested_place(place), to, arena);
-
+        Arena arena;
         nested_func->add_batch_single_place(arguments[0]->size(), 
get_nested_place(place),
-                                            arguments_raw.data(), nullptr);
+                                            arguments_raw.data(), &arena);
         nested_func->insert_result_into(get_nested_place(place), to);
     }
 
@@ -242,12 +322,13 @@ public:
     size_t align_of_data() const override { return 
nested_func->align_of_data(); }
 
     void create(AggregateDataPtr __restrict place) const override {
-        new (place) Data;
-        SAFE_CREATE(nested_func->create(get_nested_place(place)), 
this->data(place).~Data());
+        new (place) Data<stable>;
+        SAFE_CREATE(nested_func->create(get_nested_place(place)),
+                    this->data(place).~Data<stable>());
     }
 
     void destroy(AggregateDataPtr __restrict place) const noexcept override {
-        this->data(place).~Data();
+        this->data(place).~Data<stable>();
         nested_func->destroy(get_nested_place(place));
     }
 
@@ -256,6 +337,11 @@ public:
     DataTypePtr get_return_type() const override { return 
nested_func->get_return_type(); }
 
     bool allocates_memory_in_arena() const override { return true; }
+
+    AggregateFunctionPtr transmit_to_stable() override {
+        return AggregateFunctionPtr(new AggregateFunctionDistinct<Data, true>(
+                nested_func, IAggregateFunction::argument_types));
+    }
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_foreach.h 
b/be/src/vec/aggregate_functions/aggregate_function_foreach.h
index 039c2d507b8..bef52a906cf 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_foreach.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_foreach.h
@@ -223,8 +223,6 @@ public:
         return nested_function->allocates_memory_in_arena();
     }
 
-    bool is_state() const override { return nested_function->is_state(); }
-
     void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
              Arena* arena) const override {
         const IColumn* nested[num_arguments];
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h 
b/be/src/vec/aggregate_functions/aggregate_function_null.h
index a91a172fc05..59854c2a2b4 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_null.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_null.h
@@ -180,8 +180,6 @@ public:
     bool allocates_memory_in_arena() const override {
         return nested_function->allocates_memory_in_arena();
     }
-
-    bool is_state() const override { return nested_function->is_state(); }
 };
 
 /** There are two cases: for single argument and variadic.
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
index d95d0ce6ccb..cbae8cd28fe 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
@@ -26,7 +26,6 @@
 
 namespace doris::vectorized {
 
-void 
register_aggregate_function_combinator_sort(AggregateFunctionSimpleFactory& 
factory);
 void 
register_aggregate_function_combinator_distinct(AggregateFunctionSimpleFactory& 
factory);
 void 
register_aggregate_function_combinator_foreach(AggregateFunctionSimpleFactory& 
factory);
 
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h 
b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index 07b57e41359..145a07d5446 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -126,13 +126,16 @@ private:
     }
 
 public:
-    AggregateFunctionSort(const AggregateFunctionPtr& nested_func, const 
DataTypes& arguments,
+    AggregateFunctionSort(AggregateFunctionPtr nested_func, const DataTypes& 
arguments,
                           const SortDescription& sort_desc, const 
RuntimeState* state)
             : IAggregateFunctionDataHelper<Data, 
AggregateFunctionSort>(arguments),
-              _nested_func(nested_func),
+              _nested_func(std::move(nested_func)),
               _arguments(arguments),
               _sort_desc(sort_desc),
               _state(state) {
+        if (auto f = _nested_func->transmit_to_stable(); f) {
+            _nested_func = f;
+        }
         for (const auto& type : _arguments) {
             _block.insert({type, ""});
         }
@@ -158,7 +161,8 @@ public:
     }
 
     void insert_result_into(ConstAggregateDataPtr targetplace, IColumn& to) 
const override {
-        auto place = const_cast<AggregateDataPtr>(targetplace);
+        auto* place = const_cast<AggregateDataPtr>(targetplace);
+        Arena arena;
         if (!this->data(place).block.is_empty_column()) {
             this->data(place).sort();
 
@@ -167,9 +171,10 @@ public:
                 arguments_nested.emplace_back(
                         
this->data(place).block.get_by_position(i).column.get());
             }
+
             _nested_func->add_batch_single_place(arguments_nested[0]->size(),
                                                  get_nested_place(place), 
arguments_nested.data(),
-                                                 nullptr);
+                                                 &arena);
         }
 
         _nested_func->insert_result_into(get_nested_place(place), to);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to