This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch opt_perf in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/opt_perf by this push: new 983abe90fd [improvement](agg) speed up merge agg results (#12939) 983abe90fd is described below commit 983abe90fd7813ea7146022351303bdb05415889 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Sat Sep 24 14:43:19 2022 +0800 [improvement](agg) speed up merge agg results (#12939) --- be/src/exec/olap_common.h | 12 ++-- .../vec/aggregate_functions/aggregate_function.h | 20 ++++++ .../aggregate_functions/aggregate_function_avg.h | 13 ++++ .../aggregate_functions/aggregate_function_count.h | 18 ++++++ .../aggregate_function_min_max.h | 15 +++++ .../aggregate_functions/aggregate_function_sum.h | 10 +++ .../aggregate_functions/aggregate_function_uniq.h | 71 ++++++++++++++++++---- be/src/vec/common/sort/sorter.cpp | 6 +- be/src/vec/exec/vaggregation_node.h | 34 ++++++----- 9 files changed, 165 insertions(+), 34 deletions(-) diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h index 87cac1e4ac..7a339187fa 100644 --- a/be/src/exec/olap_common.h +++ b/be/src/exec/olap_common.h @@ -571,27 +571,27 @@ void ColumnValueRange<primitive_type>::convert_to_fixed_value() { } template <> -[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_STRING>> +std::vector<ColumnValueRange<PrimitiveType::TYPE_STRING>> ColumnValueRange<PrimitiveType::TYPE_STRING>::split(size_t count); template <> -[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_CHAR>> +std::vector<ColumnValueRange<PrimitiveType::TYPE_CHAR>> ColumnValueRange<PrimitiveType::TYPE_CHAR>::split(size_t count); template <> -[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_VARCHAR>> +std::vector<ColumnValueRange<PrimitiveType::TYPE_VARCHAR>> ColumnValueRange<PrimitiveType::TYPE_VARCHAR>::split(size_t count); template <> -[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_HLL>> +std::vector<ColumnValueRange<PrimitiveType::TYPE_HLL>> ColumnValueRange<PrimitiveType::TYPE_HLL>::split(size_t count); template <> -[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_DECIMALV2>> +std::vector<ColumnValueRange<PrimitiveType::TYPE_DECIMALV2>> ColumnValueRange<PrimitiveType::TYPE_DECIMALV2>::split(size_t count); template <> -[[noreturn]] std::vector<ColumnValueRange<PrimitiveType::TYPE_LARGEINT>> +std::vector<ColumnValueRange<PrimitiveType::TYPE_LARGEINT>> ColumnValueRange<PrimitiveType::TYPE_LARGEINT>::split(size_t count); template <PrimitiveType primitive_type> diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index 6309b0c994..2a7eecd2f3 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -144,6 +144,11 @@ public: virtual void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, Arena* arena) const = 0; + virtual void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, + const size_t offset, + const IColumn& column, Arena* arena, + size_t num_rows) const = 0; + /// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()). virtual bool allocates_memory_in_arena() const { return false; } @@ -347,6 +352,21 @@ public: deserialize_vec(places, assert_cast<const ColumnString*>(&column), arena, num_rows); } + void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, const size_t offset, + const IColumn& column, Arena* arena, + size_t num_rows) const override { + const auto size_of_data = static_cast<const Derived*>(this)->size_of_data(); + char tmp[size_of_data]; + for (size_t i = 0; i != num_rows; ++i) { + VectorBufferReader buffer_reader( + assert_cast<const ColumnString&>(column).get_data_at(i)); + static_cast<const Derived*>(this)->create(tmp); + static_cast<const Derived*>(this)->deserialize(tmp, buffer_reader, arena); + static_cast<const Derived*>(this)->merge(places[i] + offset, tmp, arena); + static_cast<const Derived*>(this)->destroy(tmp); + } + } + void merge_vec(const AggregateDataPtr* places, size_t offset, ConstAggregateDataPtr rhs, Arena* arena, const size_t num_rows) const override { const auto size_of_data = static_cast<const Derived*>(this)->size_of_data(); diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h b/be/src/vec/aggregate_functions/aggregate_function_avg.h index c80c46b8f9..04a8cb7ca3 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_avg.h +++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h @@ -151,6 +151,19 @@ public: memcpy(places, data, sizeof(Data) * num_rows); } + void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, const size_t offset, + const IColumn& column, Arena* arena, + size_t num_rows) const override { + auto& col = assert_cast<const ColumnFixedLengthObject&>(column); + DCHECK(col.size() >= num_rows) << "source column's size should greater than num_rows"; + auto* src_data = reinterpret_cast<const Data*>(col.get_data().data()); + for (size_t i = 0; i != num_rows; ++i) { + auto& data = this->data(places[i] + offset); + data.sum += src_data[i].sum; + data.count += src_data[i].count; + } + } + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, MutableColumnPtr& dst, const size_t num_rows) const override { auto& col = assert_cast<ColumnFixedLengthObject&>(*dst); diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h b/be/src/vec/aggregate_functions/aggregate_function_count.h index 8ab6b53e56..3b8e716499 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count.h +++ b/be/src/vec/aggregate_functions/aggregate_function_count.h @@ -81,6 +81,15 @@ public: } } + void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, const size_t offset, + const IColumn& column, Arena* arena, + size_t num_rows) const override { + auto data = assert_cast<const ColumnUInt64&>(column).get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + reinterpret_cast<Data*>(places[i] + offset)->count += data[i]; + } + } + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, MutableColumnPtr& dst, const size_t num_rows) const override { auto& col = assert_cast<ColumnUInt64&>(*dst); @@ -175,6 +184,15 @@ public: } } + void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, const size_t offset, + const IColumn& column, Arena* arena, + size_t num_rows) const override { + auto data = assert_cast<const ColumnUInt64&>(column).get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + reinterpret_cast<Data*>(places[i] + offset)->count += data[i]; + } + } + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, MutableColumnPtr& dst, const size_t num_rows) const override { auto& col = assert_cast<ColumnUInt64&>(*dst); diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h b/be/src/vec/aggregate_functions/aggregate_function_min_max.h index 37e812bf38..5abdeab288 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h +++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h @@ -487,6 +487,21 @@ public: } } + void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, const size_t offset, + const IColumn& column, Arena* arena, + size_t num_rows) const override { + if constexpr (Data::IsFixedLength) { + const auto& col = static_cast<const ColumnFixedLengthObject&>(column); + auto* column_data = reinterpret_cast<const Data*>(col.get_data().data()); + for (size_t i = 0; i != num_rows; ++i) { + *reinterpret_cast<Data*>(places[i] + offset) = column_data[i]; + } + } else { + Base::deserialize_and_merge_with_keys_from_column(places, offset, column, arena, + num_rows); + } + } + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, MutableColumnPtr& dst, const size_t num_rows) const override { if constexpr (Data::IsFixedLength) { diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h b/be/src/vec/aggregate_functions/aggregate_function_sum.h index 46ca85f3a9..b2f053f8a5 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sum.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h @@ -110,6 +110,16 @@ public: } } + void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, const size_t offset, + const IColumn& column, Arena* arena, + size_t num_rows) const override { + auto data = assert_cast<const ColVecResult&>(column).get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + auto* dst_data = reinterpret_cast<Data*>(places[i] + offset); + dst_data->sum += data[i]; + } + } + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, MutableColumnPtr& dst, const size_t num_rows) const override { auto& col = assert_cast<ColVecResult&>(*dst); diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h b/be/src/vec/aggregate_functions/aggregate_function_uniq.h index cb75117794..90caeb5d68 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h +++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h @@ -21,6 +21,7 @@ #pragma once #include <parallel_hashmap/phmap.h> +#include <parallel_hashmap/phmap_dump.h> #include <type_traits> @@ -39,6 +40,46 @@ namespace doris::vectorized { // Here is an empirical value. static constexpr size_t HASH_MAP_PREFETCH_DIST = 16; +template <typename WritableBuffer> +class BinaryOutputInMemory { +public: + BinaryOutputInMemory(WritableBuffer& buffer) : _buffer(buffer) {} + + bool dump(const char* p, size_t sz) { + _buffer.write(p, sz); + return true; + } + + template <typename V> + bool dump(const V& v) { + write_pod_binary(v, _buffer); + return true; + } + +private: + WritableBuffer& _buffer; +}; + +template <typename ReadableBuffer> +class BinaryInputInMemory { +public: + BinaryInputInMemory(ReadableBuffer& buffer) : _buffer(buffer) {} + + bool load(char* p, size_t sz) { + _buffer.read(p, sz); + return true; + } + + template <typename V> + bool load(V* v) { + read_pod_binary(v, _buffer); + return true; + } + +private: + ReadableBuffer& _buffer; +}; + /// uniqExact template <typename T> @@ -168,24 +209,30 @@ public: void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { auto& set = this->data(place).set; - write_var_uint(set.size(), buf); - for (const auto& elem : set) { - write_pod_binary(elem, buf); - } + BinaryOutputInMemory output(buf); + set.dump(output); } void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, Arena* arena) const override { auto& set = this->data(place).set; - size_t size; - read_var_uint(size, buf); - - set.rehash(size + set.size()); + BinaryInputInMemory input(buf); + if (set.size() == 0) { + set.load(input); + } else { + Data src; + src.set.load(input); + set.merge(src.set); + } + } - for (size_t i = 0; i < size; ++i) { - KeyType ref; - read_pod_binary(ref, buf); - set.insert(ref); + void deserialize_and_merge_with_keys_from_column(AggregateDataPtr* places, const size_t offset, + const IColumn& column, Arena* arena, + size_t num_rows) const override { + const auto& col = static_cast<const ColumnString&>(column); + for (size_t i = 0; i != num_rows; ++i) { + VectorBufferReader buffer_reader(col.get_data_at(i)); + deserialize_and_merge(places[i] + offset, buffer_reader, arena); } } diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 4df6a6abc7..c766f13f74 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -93,9 +93,11 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block) { for (int i = 0; i < _sort_description.size(); i++) { const auto& ordering_expr = _vsort_exec_exprs.lhs_ordering_expr_ctxs()[i]; if (materialized) { - RETURN_IF_ERROR(ordering_expr->execute(&src_block, &_sort_description[i].column_number)); + RETURN_IF_ERROR( + ordering_expr->execute(&src_block, &_sort_description[i].column_number)); } else { - RETURN_IF_ERROR(ordering_expr->execute(&dest_block, &_sort_description[i].column_number)); + RETURN_IF_ERROR( + ordering_expr->execute(&dest_block, &_sort_description[i].column_number)); } _sort_description[i].direction = _is_asc_order[i] ? 1 : -1; diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 7b41b570c3..0952ccc15a 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -978,28 +978,34 @@ private: column = ((ColumnNullable*)column.get())->get_nested_column_ptr(); } - size_t buffer_size = - _aggregate_evaluators[i]->function()->size_of_data() * rows; - if (_deserialize_buffer.size() < buffer_size) { - _deserialize_buffer.resize(buffer_size); - } - if (_use_fixed_length_serialization_opt) { SCOPED_TIMER(_deserialize_data_timer); - _aggregate_evaluators[i]->function()->deserialize_from_column( - _deserialize_buffer.data(), *column, &_agg_arena_pool, rows); + // _aggregate_evaluators[i]->function()->deserialize_from_column( + // _deserialize_buffer.data(), *column, &_agg_arena_pool, rows); + + _aggregate_evaluators[i] + ->function() + ->deserialize_and_merge_with_keys_from_column( + _places.data(), _offsets_of_aggregate_states[i], *column, + &_agg_arena_pool, rows); } else { SCOPED_TIMER(_deserialize_data_timer); + + size_t buffer_size = + _aggregate_evaluators[i]->function()->size_of_data() * rows; + if (_deserialize_buffer.size() < buffer_size) { + _deserialize_buffer.resize(buffer_size); + } _aggregate_evaluators[i]->function()->deserialize_vec( _deserialize_buffer.data(), (ColumnString*)(column.get()), &_agg_arena_pool, rows); - } - _aggregate_evaluators[i]->function()->merge_vec( - _places.data(), _offsets_of_aggregate_states[i], - _deserialize_buffer.data(), &_agg_arena_pool, rows); + _aggregate_evaluators[i]->function()->merge_vec( + _places.data(), _offsets_of_aggregate_states[i], + _deserialize_buffer.data(), &_agg_arena_pool, rows); - _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(), - rows); + _aggregate_evaluators[i]->function()->destroy_vec( + _deserialize_buffer.data(), rows); + } } else { _aggregate_evaluators[i]->execute_batch_add(block, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org