This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 86ef0069ea8 [Feature](function) support group concat with distinct and order by (#38851) 86ef0069ea8 is described below commit 86ef0069ea85ee32f5d73317f86ce76f33974b11 Author: Pxl <pxl...@qq.com> AuthorDate: Mon Aug 5 15:44:51 2024 +0800 [Feature](function) support group concat with distinct and order by (#38851) pick from #38744 and #38776 --- .../vec/aggregate_functions/aggregate_function.h | 11 +- .../aggregate_function_distinct.cpp | 25 +-- .../aggregate_function_distinct.h | 206 +++++++++++++++------ .../aggregate_function_foreach.h | 2 - .../aggregate_functions/aggregate_function_null.h | 2 - .../aggregate_function_simple_factory.cpp | 1 - .../aggregate_functions/aggregate_function_sort.h | 13 +- 7 files changed, 173 insertions(+), 87 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index c74e22bdbcd..74700dff17f 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -43,6 +43,8 @@ class AggregateFunctionBitmapCount; template <typename Op> class AggregateFunctionBitmapOp; struct AggregateFunctionBitmapUnionOp; +class IAggregateFunction; +using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>; using DataTypePtr = std::shared_ptr<const IDataType>; using DataTypes = std::vector<DataTypePtr>; @@ -178,11 +180,6 @@ public: const size_t offset, IColumn& to, const size_t num_rows) const = 0; - /** Returns true for aggregate functions of type -State. - * They are executed as other aggregate functions, but not finalized (return an aggregation state that can be combined with another). - */ - virtual bool is_state() const { return false; } - /** Contains a loop with calls to "add" function. You can collect arguments into array "places" * and do a single call to "add_batch" for devirtualization and inlining. */ @@ -223,6 +220,8 @@ public: virtual void set_version(const int version_) { version = version_; } + virtual AggregateFunctionPtr transmit_to_stable() { return nullptr; } + protected: DataTypes argument_types; int version {}; @@ -519,8 +518,6 @@ public: } }; -using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>; - class AggregateFunctionGuard { public: using AggregateData = std::remove_pointer_t<AggregateDataPtr>; diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp index 5b2269a27d9..4773a620e0a 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp @@ -29,6 +29,16 @@ namespace doris::vectorized { +template <typename T> +struct Reducer { + template <bool stable> + using Output = AggregateFunctionDistinctSingleNumericData<T, stable>; + using AggregateFunctionDistinctNormal = AggregateFunctionDistinct<Output, false>; +}; + +template <typename T> +using AggregateFunctionDistinctNumeric = Reducer<T>::AggregateFunctionDistinctNormal; + class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombinator { public: String get_name() const override { return "Distinct"; } @@ -51,22 +61,15 @@ public: if (arguments.size() == 1) { AggregateFunctionPtr res( - creator_with_numeric_type::create<AggregateFunctionDistinct, - AggregateFunctionDistinctSingleNumericData>( + creator_with_numeric_type::create<AggregateFunctionDistinctNumeric>( arguments, result_is_nullable, nested_function)); if (res) { return res; } - if (arguments[0]->is_value_unambiguously_represented_in_contiguous_memory_region()) { - res = creator_without_type::create<AggregateFunctionDistinct< - AggregateFunctionDistinctSingleGenericData<true>>>( - arguments, result_is_nullable, nested_function); - } else { - res = creator_without_type::create<AggregateFunctionDistinct< - AggregateFunctionDistinctSingleGenericData<false>>>( - arguments, result_is_nullable, nested_function); - } + res = creator_without_type::create< + AggregateFunctionDistinct<AggregateFunctionDistinctSingleGenericData>>( + arguments, result_is_nullable, nested_function); return res; } return creator_without_type::create< diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h b/be/src/vec/aggregate_functions/aggregate_function_distinct.h index c0c7a5b66dd..4f42e8509f2 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h +++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h @@ -28,6 +28,8 @@ #include <memory> #include <new> #include <string> +#include <type_traits> +#include <utility> #include <vector> #include "vec/aggregate_functions/aggregate_function.h" @@ -54,105 +56,170 @@ struct DefaultHash; namespace doris::vectorized { -template <typename T> +template <typename T, bool stable> struct AggregateFunctionDistinctSingleNumericData { /// When creating, the hash table must be small. - using Set = HashSetWithStackMemory<T, DefaultHash<T>, 4>; - using Self = AggregateFunctionDistinctSingleNumericData<T>; - Set set; + using Container = std::conditional_t<stable, phmap::flat_hash_map<T, uint32_t>, + HashSetWithStackMemory<T, DefaultHash<T>, 4>>; + using Self = AggregateFunctionDistinctSingleNumericData<T, stable>; + Container data; void add(const IColumn** columns, size_t /* columns_num */, size_t row_num, Arena*) { const auto& vec = assert_cast<const ColumnVector<T>&>(*columns[0]).get_data(); - set.insert(vec[row_num]); + if constexpr (stable) { + data.emplace(vec[row_num], data.size()); + } else { + data.insert(vec[row_num]); + } } - void merge(const Self& rhs, Arena*) { set.merge(rhs.set); } + void merge(const Self& rhs, Arena*) { + DCHECK(!stable); + if constexpr (!stable) { + data.merge(rhs.data); + } + } - void serialize(BufferWritable& buf) const { set.write(buf); } + void serialize(BufferWritable& buf) const { + DCHECK(!stable); + if constexpr (!stable) { + data.write(buf); + } + } - void deserialize(BufferReadable& buf, Arena*) { set.read(buf); } + void deserialize(BufferReadable& buf, Arena*) { + DCHECK(!stable); + if constexpr (!stable) { + data.read(buf); + } + } MutableColumns get_arguments(const DataTypes& argument_types) const { MutableColumns argument_columns; argument_columns.emplace_back(argument_types[0]->create_column()); - for (const auto& elem : set) { - argument_columns[0]->insert(elem.get_value()); + + if constexpr (stable) { + argument_columns[0]->resize(data.size()); + auto ptr = (T*)const_cast<char*>(argument_columns[0]->get_raw_data().data); + for (auto it : data) { + ptr[it.second] = it.first; + } + } else { + for (const auto& elem : data) { + argument_columns[0]->insert(elem.get_value()); + } } return argument_columns; } }; +template <bool stable> struct AggregateFunctionDistinctGenericData { /// When creating, the hash table must be small. - using Set = HashSetWithStackMemory<StringRef, StringRefHash, 4>; + using Container = std::conditional_t<stable, phmap::flat_hash_map<StringRef, uint32_t>, + HashSetWithStackMemory<StringRef, StringRefHash, 4>>; using Self = AggregateFunctionDistinctGenericData; - Set set; + Container data; void merge(const Self& rhs, Arena* arena) { - Set::LookupResult it; - bool inserted; - for (const auto& elem : rhs.set) { - StringRef key = elem.get_value(); - key.data = arena->insert(key.data, key.size); - set.emplace(key, it, inserted); + DCHECK(!stable); + if constexpr (!stable) { + typename Container::LookupResult it; + bool inserted; + for (const auto& elem : rhs.data) { + StringRef key = elem.get_value(); + key.data = arena->insert(key.data, key.size); + data.emplace(key, it, inserted); + } } } void serialize(BufferWritable& buf) const { - write_var_uint(set.size(), buf); - for (const auto& elem : set) { - write_string_binary(elem.get_value(), buf); + DCHECK(!stable); + if constexpr (!stable) { + write_var_uint(data.size(), buf); + for (const auto& elem : data) { + write_string_binary(elem.get_value(), buf); + } } } void deserialize(BufferReadable& buf, Arena* arena) { - UInt64 size; - read_var_uint(size, buf); - - StringRef ref; - for (size_t i = 0; i < size; ++i) { - read_string_binary(ref, buf); - set.insert(ref); + DCHECK(!stable); + if constexpr (!stable) { + UInt64 size; + read_var_uint(size, buf); + + StringRef ref; + for (size_t i = 0; i < size; ++i) { + read_string_binary(ref, buf); + data.insert(ref); + } } } }; -template <bool is_plain_column> -struct AggregateFunctionDistinctSingleGenericData : public AggregateFunctionDistinctGenericData { +template <bool stable> +struct AggregateFunctionDistinctSingleGenericData + : public AggregateFunctionDistinctGenericData<stable> { + using Base = AggregateFunctionDistinctGenericData<stable>; + using Base::data; void add(const IColumn** columns, size_t /* columns_num */, size_t row_num, Arena* arena) { - Set::LookupResult it; - bool inserted; auto key = columns[0]->get_data_at(row_num); key.data = arena->insert(key.data, key.size); - set.emplace(key, it, inserted); + + if constexpr (stable) { + data.emplace(key, data.size()); + } else { + typename Base::Container::LookupResult it; + bool inserted; + data.emplace(key, it, inserted); + } } MutableColumns get_arguments(const DataTypes& argument_types) const { MutableColumns argument_columns; argument_columns.emplace_back(argument_types[0]->create_column()); - for (const auto& elem : set) { - argument_columns[0]->insert_data(elem.get_value().data, elem.get_value().size); + if constexpr (stable) { + std::vector<StringRef> tmp(data.size()); + for (auto it : data) { + tmp[it.second] = it.first; + } + for (int i = 0; i < data.size(); i++) { + argument_columns[0]->insert_data(tmp[i].data, tmp[i].size); + } + } else { + for (const auto& elem : data) { + argument_columns[0]->insert_data(elem.get_value().data, elem.get_value().size); + } } return argument_columns; } }; -struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDistinctGenericData { +template <bool stable> +struct AggregateFunctionDistinctMultipleGenericData + : public AggregateFunctionDistinctGenericData<stable> { + using Base = AggregateFunctionDistinctGenericData<stable>; + using Base::data; void add(const IColumn** columns, size_t columns_num, size_t row_num, Arena* arena) { const char* begin = nullptr; - StringRef value(begin, 0); + StringRef key(begin, 0); for (size_t i = 0; i < columns_num; ++i) { auto cur_ref = columns[i]->serialize_value_into_arena(row_num, *arena, begin); - value.data = cur_ref.data - value.size; - value.size += cur_ref.size; + key.data = cur_ref.data - key.size; + key.size += cur_ref.size; } - Set::LookupResult it; - bool inserted; - value.data = arena->insert(value.data, value.size); - set.emplace(value, it, inserted); + if constexpr (stable) { + data.emplace(key, data.size()); + } else { + typename Base::Container::LookupResult it; + bool inserted; + data.emplace(key, it, inserted); + } } MutableColumns get_arguments(const DataTypes& argument_types) const { @@ -161,10 +228,23 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi argument_columns[i] = argument_types[i]->create_column(); } - for (const auto& elem : set) { - const char* begin = elem.get_value().data; - for (auto& column : argument_columns) { - begin = column->deserialize_and_insert_from_arena(begin); + if constexpr (stable) { + std::vector<StringRef> tmp(data.size()); + for (auto it : data) { + tmp[it.second] = it.first; + } + for (int i = 0; i < data.size(); i++) { + const char* begin = tmp[i].data; + for (auto& column : argument_columns) { + begin = column->deserialize_and_insert_from_arena(begin); + } + } + } else { + for (const auto& elem : data) { + const char* begin = elem.get_value().data; + for (auto& column : argument_columns) { + begin = column->deserialize_and_insert_from_arena(begin); + } } } @@ -175,9 +255,10 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi /** Adaptor for aggregate functions. * Adding -Distinct suffix to aggregate function **/ -template <typename Data> +template <template <bool stable> typename Data, bool stable = false> class AggregateFunctionDistinct - : public IAggregateFunctionDataHelper<Data, AggregateFunctionDistinct<Data>> { + : public IAggregateFunctionDataHelper<Data<stable>, + AggregateFunctionDistinct<Data, stable>> { private: size_t prefix_size; AggregateFunctionPtr nested_func; @@ -193,12 +274,13 @@ private: public: AggregateFunctionDistinct(AggregateFunctionPtr nested_func_, const DataTypes& arguments) - : IAggregateFunctionDataHelper<Data, AggregateFunctionDistinct>(arguments), - nested_func(nested_func_), + : IAggregateFunctionDataHelper<Data<stable>, AggregateFunctionDistinct<Data, stable>>( + arguments), + nested_func(std::move(nested_func_)), arguments_num(arguments.size()) { size_t nested_size = nested_func->align_of_data(); CHECK_GT(nested_size, 0); - prefix_size = (sizeof(Data) + nested_size - 1) / nested_size * nested_size; + prefix_size = (sizeof(Data<stable>) + nested_size - 1) / nested_size * nested_size; } void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, @@ -221,7 +303,7 @@ public: } void insert_result_into(ConstAggregateDataPtr targetplace, IColumn& to) const override { - auto place = const_cast<AggregateDataPtr>(targetplace); + auto* place = const_cast<AggregateDataPtr>(targetplace); auto arguments = this->data(place).get_arguments(this->argument_types); ColumnRawPtrs arguments_raw(arguments.size()); for (size_t i = 0; i < arguments.size(); ++i) { @@ -229,11 +311,9 @@ public: } assert(!arguments.empty()); - // nested_func->add_batch_single_place(arguments[0]->size(), get_nested_place(place), arguments_raw.data(), arena); - // nested_func->insert_result_into(get_nested_place(place), to, arena); - + Arena arena; nested_func->add_batch_single_place(arguments[0]->size(), get_nested_place(place), - arguments_raw.data(), nullptr); + arguments_raw.data(), &arena); nested_func->insert_result_into(get_nested_place(place), to); } @@ -242,12 +322,13 @@ public: size_t align_of_data() const override { return nested_func->align_of_data(); } void create(AggregateDataPtr __restrict place) const override { - new (place) Data; - SAFE_CREATE(nested_func->create(get_nested_place(place)), this->data(place).~Data()); + new (place) Data<stable>; + SAFE_CREATE(nested_func->create(get_nested_place(place)), + this->data(place).~Data<stable>()); } void destroy(AggregateDataPtr __restrict place) const noexcept override { - this->data(place).~Data(); + this->data(place).~Data<stable>(); nested_func->destroy(get_nested_place(place)); } @@ -256,6 +337,11 @@ public: DataTypePtr get_return_type() const override { return nested_func->get_return_type(); } bool allocates_memory_in_arena() const override { return true; } + + AggregateFunctionPtr transmit_to_stable() override { + return AggregateFunctionPtr(new AggregateFunctionDistinct<Data, true>( + nested_func, IAggregateFunction::argument_types)); + } }; } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_foreach.h b/be/src/vec/aggregate_functions/aggregate_function_foreach.h index 039c2d507b8..bef52a906cf 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_foreach.h +++ b/be/src/vec/aggregate_functions/aggregate_function_foreach.h @@ -223,8 +223,6 @@ public: return nested_function->allocates_memory_in_arena(); } - bool is_state() const override { return nested_function->is_state(); } - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, Arena* arena) const override { const IColumn* nested[num_arguments]; diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h b/be/src/vec/aggregate_functions/aggregate_function_null.h index a91a172fc05..59854c2a2b4 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_null.h +++ b/be/src/vec/aggregate_functions/aggregate_function_null.h @@ -180,8 +180,6 @@ public: bool allocates_memory_in_arena() const override { return nested_function->allocates_memory_in_arena(); } - - bool is_state() const override { return nested_function->is_state(); } }; /** There are two cases: for single argument and variadic. diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp index d95d0ce6ccb..cbae8cd28fe 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp @@ -26,7 +26,6 @@ namespace doris::vectorized { -void register_aggregate_function_combinator_sort(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_combinator_distinct(AggregateFunctionSimpleFactory& factory); void register_aggregate_function_combinator_foreach(AggregateFunctionSimpleFactory& factory); diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h index 07b57e41359..145a07d5446 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sort.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h @@ -126,13 +126,16 @@ private: } public: - AggregateFunctionSort(const AggregateFunctionPtr& nested_func, const DataTypes& arguments, + AggregateFunctionSort(AggregateFunctionPtr nested_func, const DataTypes& arguments, const SortDescription& sort_desc, const RuntimeState* state) : IAggregateFunctionDataHelper<Data, AggregateFunctionSort>(arguments), - _nested_func(nested_func), + _nested_func(std::move(nested_func)), _arguments(arguments), _sort_desc(sort_desc), _state(state) { + if (auto f = _nested_func->transmit_to_stable(); f) { + _nested_func = f; + } for (const auto& type : _arguments) { _block.insert({type, ""}); } @@ -158,7 +161,8 @@ public: } void insert_result_into(ConstAggregateDataPtr targetplace, IColumn& to) const override { - auto place = const_cast<AggregateDataPtr>(targetplace); + auto* place = const_cast<AggregateDataPtr>(targetplace); + Arena arena; if (!this->data(place).block.is_empty_column()) { this->data(place).sort(); @@ -167,9 +171,10 @@ public: arguments_nested.emplace_back( this->data(place).block.get_by_position(i).column.get()); } + _nested_func->add_batch_single_place(arguments_nested[0]->size(), get_nested_place(place), arguments_nested.data(), - nullptr); + &arena); } _nested_func->insert_result_into(get_nested_place(place), to); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org