This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b7c9007776 [improvement][agg]Process aggregated results in the vectorized way (#11084) b7c9007776 is described below commit b7c9007776cf9b8f9db343d41b164cc6760f0975 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Fri Jul 22 22:04:43 2022 +0800 [improvement][agg]Process aggregated results in the vectorized way (#11084) --- .../vec/aggregate_functions/aggregate_function.h | 48 +++++++++++++ be/src/vec/columns/column.h | 21 ++++++ be/src/vec/columns/column_decimal.cpp | 22 ++++++ be/src/vec/columns/column_decimal.h | 12 ++++ be/src/vec/columns/column_nullable.cpp | 29 ++++++++ be/src/vec/columns/column_nullable.h | 10 +++ be/src/vec/columns/column_string.cpp | 21 ++++++ be/src/vec/columns/column_string.h | 25 +++++++ be/src/vec/columns/column_vector.cpp | 22 ++++++ be/src/vec/columns/column_vector.h | 13 ++++ be/src/vec/exec/vaggregation_node.cpp | 81 ++++++++++++---------- be/src/vec/exec/vaggregation_node.h | 39 +++++++++++ be/src/vec/exprs/vectorized_agg_fn.cpp | 5 ++ be/src/vec/exprs/vectorized_agg_fn.h | 3 + 14 files changed, 316 insertions(+), 35 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index eca81b3199..677c189002 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -91,19 +91,32 @@ public: virtual void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena* arena) const = 0; + virtual void merge_vec(const AggregateDataPtr* places, size_t offset, ConstAggregateDataPtr rhs, + Arena* arena, const size_t num_rows) const = 0; + /// Serializes state (to transmit it over the network, for example). virtual void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const = 0; + virtual void serialize_vec(const std::vector<AggregateDataPtr>& places, size_t offset, + BufferWritable& buf, const size_t num_rows) const = 0; + /// Deserializes state. This function is called only for empty (just created) states. virtual void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Arena* arena) const = 0; + virtual void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* arena, + size_t num_rows) const = 0; + /// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()). virtual bool allocates_memory_in_arena() const { return false; } /// Inserts results into a column. virtual void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const = 0; + virtual void insert_result_into_vec(const std::vector<AggregateDataPtr>& places, + const size_t offset, IColumn& to, + const size_t num_rows) const = 0; + /** Returns true for aggregate functions of type -State. * They are executed as other aggregate functions, but not finalized (return an aggregation state that can be combined with another). */ @@ -176,6 +189,41 @@ public: static_cast<const Derived*>(this)->add(place, columns, i, arena); } } + + void insert_result_into_vec(const std::vector<AggregateDataPtr>& places, const size_t offset, + IColumn& to, const size_t num_rows) const override { + for (size_t i = 0; i != num_rows; ++i) { + static_cast<const Derived*>(this)->insert_result_into(places[i] + offset, to); + } + } + + void serialize_vec(const std::vector<AggregateDataPtr>& places, size_t offset, + BufferWritable& buf, const size_t num_rows) const override { + for (size_t i = 0; i != num_rows; ++i) { + static_cast<const Derived*>(this)->serialize(places[i] + offset, buf); + buf.commit(); + } + } + + void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* arena, + size_t num_rows) const override { + const auto size_of_data = static_cast<const Derived*>(this)->size_of_data(); + for (size_t i = 0; i != num_rows; ++i) { + auto place = places + size_of_data * i; + VectorBufferReader buffer_reader(column->get_data_at(i)); + static_cast<const Derived*>(this)->create(place); + static_cast<const Derived*>(this)->deserialize(place, buffer_reader, arena); + } + } + + void merge_vec(const AggregateDataPtr* places, size_t offset, ConstAggregateDataPtr rhs, + Arena* arena, const size_t num_rows) const override { + const auto size_of_data = static_cast<const Derived*>(this)->size_of_data(); + for (size_t i = 0; i != num_rows; ++i) { + static_cast<const Derived*>(this)->merge(places[i] + offset, rhs + size_of_data * i, + arena); + } + } }; /// Implements several methods for manipulation with data. T - type of structure with data for aggregation. diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index b6c8f3e67c..006389ded2 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -204,6 +204,16 @@ public: LOG(FATAL) << "Method insert_many_binary_data is not supported for " << get_name(); } + virtual void insert_many_strings(const StringRef* strings, size_t num) { + LOG(FATAL) << "Method insert_many_binary_data is not supported for " << get_name(); + } + + // Here `pos` points to the memory data type is the same as the data type of the column. + // This function is used by `insert_keys_into_columns` in AggregationNode. + virtual void insert_many_raw_data(const char* pos, size_t num) { + LOG(FATAL) << "Method insert_many_raw_data is not supported for " << get_name(); + } + void insert_many_data(const char* pos, size_t length, size_t data_num) { for (size_t i = 0; i < data_num; ++i) { insert_data(pos, length); @@ -263,6 +273,17 @@ public: LOG(FATAL) << "serialize_vec_with_null_map not supported"; } + // This function deserializes group-by keys into column in the vectorized way. + virtual void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) { + LOG(FATAL) << "deserialize_vec not supported"; + } + + // Used in ColumnNullable::deserialize_vec + virtual void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const size_t num_rows, + const uint8_t* null_map) { + LOG(FATAL) << "deserialize_vec_with_null_map not supported"; + } + /// Update state of hash function with value of n-th element. /// On subsequent calls of this method for sequence of column values of arbitrary types, /// passed bytes to hash must identify sequence of values unambiguously. diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 315c1103c0..1b3c58a778 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -85,6 +85,28 @@ void ColumnDecimal<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys, } } +template <typename T> +void ColumnDecimal<T>::deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) { + for (size_t i = 0; i < num_rows; ++i) { + keys[i].data = deserialize_and_insert_from_arena(keys[i].data); + keys[i].size -= sizeof(T); + } +} + +template <typename T> +void ColumnDecimal<T>::deserialize_vec_with_null_map(std::vector<StringRef>& keys, + const size_t num_rows, + const uint8_t* null_map) { + for (size_t i = 0; i < num_rows; ++i) { + if (null_map[i] == 0) { + keys[i].data = deserialize_and_insert_from_arena(keys[i].data); + keys[i].size -= sizeof(T); + } else { + insert_default(); + } + } +} + template <typename T> UInt64 ColumnDecimal<T>::get64(size_t n) const { if constexpr (sizeof(T) > sizeof(UInt64)) { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index d28e7ae469..e318271b13 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -114,6 +114,13 @@ public: } void insert_many_fix_len_data(const char* data_ptr, size_t num) override; + + void insert_many_raw_data(const char* pos, size_t num) override { + size_t old_size = data.size(); + data.resize(old_size + num); + memcpy(data.data() + old_size, pos, num * sizeof(T)); + } + void insert_data(const char* pos, size_t /*length*/) override; void insert_default() override { data.push_back(T()); } void insert(const Field& x) override { @@ -141,6 +148,11 @@ public: const uint8_t* null_map, size_t max_row_byte_size) const override; + void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) override; + + void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const size_t num_rows, + const uint8_t* null_map) override; + void update_hash_with_value(size_t n, SipHash& hash) const override; int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 982ddf9fdd..b8058a8d97 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -96,6 +96,20 @@ void ColumnNullable::insert_data(const char* pos, size_t length) { } } +void ColumnNullable::insert_many_strings(const StringRef* strings, size_t num) { + auto& nested_column = get_nested_column(); + auto& null_map_data = get_null_map_data(); + for (size_t i = 0; i != num; ++i) { + if (strings[i].data == nullptr) { + nested_column.insert_default(); + null_map_data.push_back(1); + } else { + nested_column.insert_data(strings[i].data, strings[i].size); + null_map_data.push_back(0); + } + } +} + StringRef ColumnNullable::serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const { const auto& arr = get_null_map_data(); @@ -152,6 +166,21 @@ void ColumnNullable::serialize_vec(std::vector<StringRef>& keys, size_t num_rows get_nested_column().serialize_vec_with_null_map(keys, num_rows, arr.data(), max_row_byte_size); } +void ColumnNullable::deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) { + auto& arr = get_null_map_data(); + const size_t old_size = arr.size(); + arr.resize(old_size + num_rows); + + auto* null_map_data = &arr[old_size]; + for (size_t i = 0; i != num_rows; ++i) { + UInt8 val = *reinterpret_cast<const UInt8*>(keys[i].data); + null_map_data[i] = val; + keys[i].data += sizeof(val); + keys[i].size -= sizeof(val); + } + get_nested_column().deserialize_vec_with_null_map(keys, num_rows, arr.data()); +} + void ColumnNullable::insert_range_from(const IColumn& src, size_t start, size_t length) { const ColumnNullable& nullable_col = assert_cast<const ColumnNullable&>(src); get_null_map_column().insert_range_from(*nullable_col.null_map, start, length); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 1fa9a50e10..06e3c9fa8a 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -87,11 +87,16 @@ public: /// JOIN_NULL_HINT in null map means null is generated by join, only use in tuple is null void insert_join_null_data(); + void insert_many_strings(const StringRef* strings, size_t num) override; + StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; const char* deserialize_and_insert_from_arena(const char* pos) override; size_t get_max_row_byte_size() const override; void serialize_vec(std::vector<StringRef>& keys, size_t num_rows, size_t max_row_byte_size) const override; + + void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) override; + void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override; @@ -107,6 +112,11 @@ public: get_nested_column().insert_many_fix_len_data(pos, num); } + void insert_many_raw_data(const char* pos, size_t num) override { + get_null_map_column().fill(0, num); + get_nested_column().insert_many_raw_data(pos, num); + } + void insert_many_dict_data(const int32_t* data_array, size_t start_index, const StringRef* dict, size_t data_num, uint32_t dict_num) override { get_null_map_column().fill(0, data_num); diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index e74ec544dc..3adf082ae0 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -228,6 +228,27 @@ void ColumnString::serialize_vec_with_null_map(std::vector<StringRef>& keys, siz } } +void ColumnString::deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) { + for (size_t i = 0; i != num_rows; ++i) { + auto original_ptr = keys[i].data; + keys[i].data = deserialize_and_insert_from_arena(original_ptr); + keys[i].size -= keys[i].data - original_ptr; + } +} + +void ColumnString::deserialize_vec_with_null_map(std::vector<StringRef>& keys, + const size_t num_rows, const uint8_t* null_map) { + for (size_t i = 0; i != num_rows; ++i) { + if (null_map[i] == 0) { + auto original_ptr = keys[i].data; + keys[i].data = deserialize_and_insert_from_arena(original_ptr); + keys[i].size -= keys[i].data - original_ptr; + } else { + insert_default(); + } + } +} + template <typename Type> ColumnPtr ColumnString::index_impl(const PaddedPODArray<Type>& indexes, size_t limit) const { if (limit == 0) return ColumnString::create(); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 6d7b55da2d..bbe6993ae2 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -180,6 +180,26 @@ public: } }; + void insert_many_strings(const StringRef* strings, size_t num) override { + size_t new_size = 0; + for (size_t i = 0; i < num; i++) { + new_size += strings[i].size + 1; + } + + const size_t old_size = chars.size(); + chars.resize(old_size + new_size); + + Char* data = chars.data(); + size_t offset = old_size; + for (size_t i = 0; i < num; i++) { + uint32_t len = strings[i].size; + if (len) memcpy(data + offset, strings[i].data, len); + data[offset + len] = 0; + offset += len + 1; + offsets.push_back(offset); + } + } + void insert_many_dict_data(const int32_t* data_array, size_t start_index, const StringRef* dict, size_t num, uint32_t /*dict_num*/) override { for (size_t end_index = start_index + num; start_index < end_index; ++start_index) { @@ -208,6 +228,8 @@ public: const char* deserialize_and_insert_from_arena(const char* pos) override; + void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) override; + size_t get_max_row_byte_size() const override; void serialize_vec(std::vector<StringRef>& keys, size_t num_rows, @@ -217,6 +239,9 @@ public: const uint8_t* null_map, size_t max_row_byte_size) const override; + void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const size_t num_rows, + const uint8_t* null_map) override; + void update_hash_with_value(size_t n, SipHash& hash) const override { size_t string_size = size_at(n); size_t offset = offset_at(n); diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 3a198fef20..c740a515a0 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -78,6 +78,28 @@ void ColumnVector<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys, } } +template <typename T> +void ColumnVector<T>::deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) { + for (size_t i = 0; i != num_rows; ++i) { + keys[i].data = deserialize_and_insert_from_arena(keys[i].data); + keys[i].size -= sizeof(T); + } +} + +template <typename T> +void ColumnVector<T>::deserialize_vec_with_null_map(std::vector<StringRef>& keys, + const size_t num_rows, + const uint8_t* null_map) { + for (size_t i = 0; i < num_rows; ++i) { + if (null_map[i] == 0) { + keys[i].data = deserialize_and_insert_from_arena(keys[i].data); + keys[i].size -= sizeof(T); + } else { + insert_default(); + } + } +} + template <typename T> void ColumnVector<T>::update_hash_with_value(size_t n, SipHash& hash) const { hash.update(data[n]); diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 544cf8bc4a..c673f0b7bd 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -208,6 +208,14 @@ public: } } + void insert_many_raw_data(const char* pos, size_t num) override { + if constexpr (std::is_same_v<T, vectorized::Int128>) { + insert_many_in_copy_way(pos, num); + } else { + insert_many_default_type(pos, num); + } + } + void insert_default() override { data.push_back(T()); } void insert_many_defaults(size_t length) override { @@ -222,6 +230,11 @@ public: const char* deserialize_and_insert_from_arena(const char* pos) override; + void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows) override; + + void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const size_t num_rows, + const uint8_t* null_map) override; + size_t get_max_row_byte_size() const override; void serialize_vec(std::vector<StringRef>& keys, size_t num_rows, diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index c7539996c6..c5f3d7ddf9 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -959,16 +959,27 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo auto& data = agg_method.data; auto& iter = agg_method.iterator; agg_method.init_once(); - while (iter != data.end() && key_columns[0]->size() < state->batch_size()) { - const auto& key = iter->get_first(); - auto& mapped = iter->get_second(); - agg_method.insert_key_into_columns(key, key_columns, _probe_key_sz); - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) - _aggregate_evaluators[i]->insert_result_info( - mapped + _offsets_of_aggregate_states[i], value_columns[i].get()); - + const auto size = std::min(data.size(), size_t(state->batch_size())); + using KeyType = std::decay_t<decltype(iter->get_first())>; + std::vector<KeyType> keys(size); + std::vector<AggregateDataPtr> values(size); + + size_t num_rows = 0; + while (iter != data.end() && num_rows < state->batch_size()) { + keys[num_rows] = iter->get_first(); + values[num_rows] = iter->get_second(); ++iter; + ++num_rows; + } + + agg_method.insert_keys_into_columns(keys, key_columns, num_rows, _probe_key_sz); + + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->insert_result_info_vec( + values, _offsets_of_aggregate_states[i], value_columns[i].get(), + num_rows); } + if (iter == data.end()) { if (agg_method.data.has_null_key_data()) { // only one key of group by support wrap null key @@ -1043,19 +1054,26 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat agg_method.init_once(); auto& data = agg_method.data; auto& iter = agg_method.iterator; - while (iter != data.end() && key_columns[0]->size() < state->batch_size()) { - const auto& key = iter->get_first(); - auto& mapped = iter->get_second(); - // insert keys - agg_method.insert_key_into_columns(key, key_columns, _probe_key_sz); - - // serialize values - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize( - mapped + _offsets_of_aggregate_states[i], value_buffer_writers[i]); - value_buffer_writers[i].commit(); - } + + const auto size = std::min(data.size(), size_t(state->batch_size())); + using KeyType = std::decay_t<decltype(iter->get_first())>; + std::vector<KeyType> keys(size); + std::vector<AggregateDataPtr> values(size); + + size_t num_rows = 0; + while (iter != data.end() && num_rows < state->batch_size()) { + keys[num_rows] = iter->get_first(); + values[num_rows] = iter->get_second(); ++iter; + ++num_rows; + } + + agg_method.insert_keys_into_columns(keys, key_columns, num_rows, _probe_key_sz); + + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->serialize_vec( + values, _offsets_of_aggregate_states[i], value_buffer_writers[i], + num_rows); } if (iter == data.end()) { @@ -1166,8 +1184,6 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) { }, _agg_data._aggregated_method_variant); - std::unique_ptr<char[]> deserialize_buffer(new char[_total_size_of_aggregate_states]); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 && _aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref()); @@ -1179,21 +1195,16 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) { column = ((ColumnNullable*)column.get())->get_nested_column_ptr(); } - for (int j = 0; j < rows; ++j) { - VectorBufferReader buffer_reader(((ColumnString*)(column.get()))->get_data_at(j)); - _create_agg_status(deserialize_buffer.get()); - - _aggregate_evaluators[i]->function()->deserialize( - deserialize_buffer.get() + _offsets_of_aggregate_states[i], buffer_reader, - &_agg_arena_pool); + std::unique_ptr<char> deserialize_buffer( + new char[_aggregate_evaluators[i]->function()->size_of_data() * rows]); - _aggregate_evaluators[i]->function()->merge( - places.data()[j] + _offsets_of_aggregate_states[i], - deserialize_buffer.get() + _offsets_of_aggregate_states[i], - &_agg_arena_pool); + _aggregate_evaluators[i]->function()->deserialize_vec(deserialize_buffer.get(), + (ColumnString*)(column.get()), + &_agg_arena_pool, rows); + _aggregate_evaluators[i]->function()->merge_vec( + places.data(), _offsets_of_aggregate_states[i], deserialize_buffer.get(), + &_agg_arena_pool, rows); - _destroy_agg_status(deserialize_buffer.get()); - } } else { _aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i], places.data(), &_agg_arena_pool); diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 08834e4611..fd6b67f255 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -92,6 +92,11 @@ struct AggregationMethodSerialized { for (auto& column : key_columns) pos = column->deserialize_and_insert_from_arena(pos); } + static void insert_keys_into_columns(std::vector<StringRef>& keys, MutableColumns& key_columns, + const size_t num_rows, const Sizes&) { + for (auto& column : key_columns) column->deserialize_vec(keys, num_rows); + } + void init_once() { if (!inited) { inited = true; @@ -136,6 +141,12 @@ struct AggregationMethodStringNoCache { key_columns[0]->insert_data(key.data, key.size); } + static void insert_keys_into_columns(std::vector<StringRef>& keys, MutableColumns& key_columns, + const size_t num_rows, const Sizes&) { + key_columns[0]->reserve(num_rows); + key_columns[0]->insert_many_strings(keys.data(), num_rows); + } + void init_once() { if (!inited) { inited = true; @@ -174,6 +185,16 @@ struct AggregationMethodOneNumber { column->insert_raw_data<sizeof(FieldType)>(key_holder); } + static void insert_keys_into_columns(std::vector<Key>& keys, MutableColumns& key_columns, + const size_t num_rows, const Sizes&) { + key_columns[0]->reserve(num_rows); + auto* column = static_cast<ColumnVectorHelper*>(key_columns[0].get()); + for (size_t i = 0; i != num_rows; ++i) { + const auto* key_holder = reinterpret_cast<const char*>(&keys[i]); + column->insert_raw_data<sizeof(FieldType)>(key_holder); + } + } + void init_once() { if (!inited) { inited = true; @@ -275,6 +296,13 @@ struct AggregationMethodKeysFixed { } } + static void insert_keys_into_columns(std::vector<Key>& keys, MutableColumns& key_columns, + const size_t num_rows, const Sizes& key_sizes) { + for (size_t i = 0; i != num_rows; ++i) { + insert_key_into_columns(keys[i], key_columns, key_sizes); + } + } + void init_once() { if (!inited) { inited = true; @@ -312,6 +340,17 @@ struct AggregationMethodSingleNullableColumn : public SingleColumnMethod { col->insert_data(reinterpret_cast<const char*>(&key), sizeof(key)); } } + + static void insert_keys_into_columns(std::vector<Key>& keys, MutableColumns& key_columns, + const size_t num_rows, const Sizes&) { + auto col = key_columns[0].get(); + col->reserve(num_rows); + if constexpr (std::is_same_v<Key, StringRef>) { + col->insert_many_strings(keys.data(), num_rows); + } else { + col->insert_many_raw_data(reinterpret_cast<char*>(keys.data()), num_rows); + } + } }; using AggregatedDataWithUInt8Key = diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 6882788e2c..427b90ec2f 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -136,6 +136,11 @@ void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn* column) _function->insert_result_into(place, *column); } +void AggFnEvaluator::insert_result_info_vec(const std::vector<AggregateDataPtr>& places, + size_t offset, IColumn* column, const size_t num_rows) { + _function->insert_result_into_vec(places, offset, *column, num_rows); +} + void AggFnEvaluator::reset(AggregateDataPtr place) { _function->reset(place); } diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index 8a4f777355..a541257487 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -58,6 +58,9 @@ public: void insert_result_info(AggregateDataPtr place, IColumn* column); + void insert_result_info_vec(const std::vector<AggregateDataPtr>& place, size_t offset, + IColumn* column, const size_t num_rows); + void reset(AggregateDataPtr place); DataTypePtr& data_type() { return _data_type; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org