github-actions[bot] commented on code in PR #31151: URL: https://github.com/apache/doris/pull/31151#discussion_r1495733807
########## be/src/pipeline/exec/streaming_aggregation_operator.h: ########## @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> Review Comment: warning: inclusion of deprecated C++ header 'stdint.h'; consider using 'cstdint' instead [modernize-deprecated-headers] ```suggestion #include <cstdint> ``` ########## be/src/pipeline/exec/streaming_aggregation_operator.h: ########## @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <memory> + +#include "common/status.h" +#include "pipeline/pipeline_x/operator.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" + +namespace doris { +class RuntimeState; + +namespace pipeline { + +class StreamingAggOperatorX; + +class StreamingAggLocalState final : public PipelineXLocalState<FakeDependency> { +public: + using Parent = StreamingAggOperatorX; + using Base = PipelineXLocalState<FakeDependency>; + ENABLE_FACTORY_CREATOR(StreamingAggLocalState); + StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent); + ~StreamingAggLocalState() override = default; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status close(RuntimeState* state) override; + Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block); + void make_nullable_output_key(vectorized::Block* block); + +private: + friend class StreamingAggOperatorX; + template <typename LocalStateType> + friend class StatefulOperatorX; + + size_t _memory_usage() const; + Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block, + doris::vectorized::Block* out_block); + bool _should_expand_preagg_hash_tables(); + void _make_nullable_output_key(vectorized::Block* block); + Status _execute_without_key(vectorized::Block* block); + Status _merge_without_key(vectorized::Block* block); + void _update_memusage_without_key(); + Status _execute_with_serialized_key(vectorized::Block* block); + Status _merge_with_serialized_key(vectorized::Block* block); + void _update_memusage_with_serialized_key(); + void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); + Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + Status _get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + Status _serialize_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + + template <bool limit, bool for_spill = false> + Status _merge_with_serialized_key_helper(vectorized::Block* block); + template <bool limit> + Status _execute_with_serialized_key_helper(vectorized::Block* block); + void _find_in_hash_table(vectorized::AggregateDataPtr* places, + vectorized::ColumnRawPtrs& key_columns, size_t num_rows); + int _get_slot_column_id(const vectorized::AggFnEvaluator* evaluator); + void _emplace_into_hash_table(vectorized::AggregateDataPtr* places, + vectorized::ColumnRawPtrs& key_columns, const size_t num_rows); Review Comment: warning: parameter 'num_rows' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion vectorized::ColumnRawPtrs& key_columns, size_t num_rows); ``` ########## be/src/pipeline/exec/streaming_aggregation_operator.h: ########## @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <memory> + +#include "common/status.h" +#include "pipeline/pipeline_x/operator.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" + +namespace doris { +class RuntimeState; + +namespace pipeline { + +class StreamingAggOperatorX; + +class StreamingAggLocalState final : public PipelineXLocalState<FakeDependency> { +public: + using Parent = StreamingAggOperatorX; + using Base = PipelineXLocalState<FakeDependency>; + ENABLE_FACTORY_CREATOR(StreamingAggLocalState); + StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent); + ~StreamingAggLocalState() override = default; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status close(RuntimeState* state) override; + Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block); + void make_nullable_output_key(vectorized::Block* block); + +private: + friend class StreamingAggOperatorX; + template <typename LocalStateType> + friend class StatefulOperatorX; + + size_t _memory_usage() const; + Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block, + doris::vectorized::Block* out_block); + bool _should_expand_preagg_hash_tables(); + void _make_nullable_output_key(vectorized::Block* block); + Status _execute_without_key(vectorized::Block* block); + Status _merge_without_key(vectorized::Block* block); + void _update_memusage_without_key(); + Status _execute_with_serialized_key(vectorized::Block* block); + Status _merge_with_serialized_key(vectorized::Block* block); + void _update_memusage_with_serialized_key(); + void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); + Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + Status _get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + Status _serialize_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + + template <bool limit, bool for_spill = false> + Status _merge_with_serialized_key_helper(vectorized::Block* block); + template <bool limit> + Status _execute_with_serialized_key_helper(vectorized::Block* block); + void _find_in_hash_table(vectorized::AggregateDataPtr* places, + vectorized::ColumnRawPtrs& key_columns, size_t num_rows); + int _get_slot_column_id(const vectorized::AggFnEvaluator* evaluator); + void _emplace_into_hash_table(vectorized::AggregateDataPtr* places, + vectorized::ColumnRawPtrs& key_columns, const size_t num_rows); + Status _create_agg_status(vectorized::AggregateDataPtr data); + size_t _get_hash_table_size(); + + RuntimeProfile::Counter* _queue_byte_size_counter = nullptr; + RuntimeProfile::Counter* _queue_size_counter = nullptr; + RuntimeProfile::Counter* _streaming_agg_timer = nullptr; + RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; + RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; + RuntimeProfile::Counter* _hash_table_input_counter = nullptr; + RuntimeProfile::Counter* _build_timer = nullptr; + RuntimeProfile::Counter* _expr_timer = nullptr; + RuntimeProfile::Counter* _build_table_convert_timer = nullptr; + RuntimeProfile::Counter* _serialize_key_timer = nullptr; + RuntimeProfile::Counter* _merge_timer = nullptr; + RuntimeProfile::Counter* _serialize_data_timer = nullptr; + RuntimeProfile::Counter* _deserialize_data_timer = nullptr; + RuntimeProfile::Counter* _max_row_size_counter = nullptr; + RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; + RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; + RuntimeProfile::Counter* _hash_table_size_counter = nullptr; + RuntimeProfile::Counter* _get_results_timer = nullptr; + RuntimeProfile::Counter* _serialize_result_timer = nullptr; + RuntimeProfile::Counter* _hash_table_iterate_timer = nullptr; + RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr; + + bool _should_expand_hash_table = true; + int64_t _cur_num_rows_returned = 0; + std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr; + vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr; + std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators; + // group by k1,k2 + vectorized::VExprContextSPtrs _probe_expr_ctxs; + std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr; + std::unique_ptr<vectorized::AggregateDataContainer> _aggregate_data_container = nullptr; + bool _should_limit_output = false; + bool _reach_limit = false; + size_t _input_num_rows = 0; + + vectorized::PODArray<vectorized::AggregateDataPtr> _places; + std::vector<char> _deserialize_buffer; + + struct ExecutorBase { + virtual Status execute(StreamingAggLocalState* local_state, vectorized::Block* block) = 0; + virtual void update_memusage(StreamingAggLocalState* local_state) = 0; + virtual Status get_result(StreamingAggLocalState* local_state, RuntimeState* state, + vectorized::Block* block, SourceState& source_state) = 0; + virtual ~ExecutorBase() = default; + }; + template <bool WithoutKey, bool NeedToMerge, bool NeedFinalize> + struct Executor final : public ExecutorBase { + Status get_result(StreamingAggLocalState* local_state, RuntimeState* state, + vectorized::Block* block, SourceState& source_state) override { + if constexpr (WithoutKey) { + if constexpr (NeedFinalize) { + return local_state->_get_without_key_result(state, block, source_state); + } else { + return local_state->_serialize_without_key(state, block, source_state); + } + } else { + if constexpr (NeedFinalize) { + return local_state->_get_with_serialized_key_result(state, block, source_state); + } else { + return local_state->_serialize_with_serialized_key_result(state, block, + source_state); + } + } + } + + Status execute(StreamingAggLocalState* local_state, vectorized::Block* block) override { + if constexpr (WithoutKey) { + if constexpr (NeedToMerge) { + return local_state->_merge_without_key(block); + } else { + return local_state->_execute_without_key(block); + } + } else { + if constexpr (NeedToMerge) { + return local_state->_merge_with_serialized_key(block); + } else { + return local_state->_execute_with_serialized_key(block); + } + } + } + + void update_memusage(StreamingAggLocalState* local_state) override { + if constexpr (WithoutKey) { + local_state->_update_memusage_without_key(); + } else { + local_state->_update_memusage_with_serialized_key(); + } + } + }; + std::unique_ptr<ExecutorBase> _executor = nullptr; + + struct MemoryRecord { + MemoryRecord() : used_in_arena(0), used_in_state(0) {} + int64_t used_in_arena; Review Comment: warning: use default member initializer for 'used_in_arena' [modernize-use-default-member-init] be/src/pipeline/exec/streaming_aggregation_operator.h:178: ```diff - MemoryRecord() : used_in_arena(0), used_in_state(0) {} - int64_t used_in_arena; + MemoryRecord() : , used_in_state(0) {} + int64_t used_in_arena{0}; ``` ########## be/src/pipeline/exec/streaming_aggregation_operator.h: ########## @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <memory> + +#include "common/status.h" +#include "pipeline/pipeline_x/operator.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" + +namespace doris { +class RuntimeState; + +namespace pipeline { + +class StreamingAggOperatorX; + +class StreamingAggLocalState final : public PipelineXLocalState<FakeDependency> { +public: + using Parent = StreamingAggOperatorX; + using Base = PipelineXLocalState<FakeDependency>; + ENABLE_FACTORY_CREATOR(StreamingAggLocalState); + StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent); + ~StreamingAggLocalState() override = default; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status close(RuntimeState* state) override; + Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block); + void make_nullable_output_key(vectorized::Block* block); + +private: + friend class StreamingAggOperatorX; + template <typename LocalStateType> + friend class StatefulOperatorX; + + size_t _memory_usage() const; + Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block, + doris::vectorized::Block* out_block); + bool _should_expand_preagg_hash_tables(); + void _make_nullable_output_key(vectorized::Block* block); + Status _execute_without_key(vectorized::Block* block); + Status _merge_without_key(vectorized::Block* block); + void _update_memusage_without_key(); + Status _execute_with_serialized_key(vectorized::Block* block); + Status _merge_with_serialized_key(vectorized::Block* block); + void _update_memusage_with_serialized_key(); + void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); + Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + Status _get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + Status _serialize_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state); + + template <bool limit, bool for_spill = false> + Status _merge_with_serialized_key_helper(vectorized::Block* block); + template <bool limit> + Status _execute_with_serialized_key_helper(vectorized::Block* block); + void _find_in_hash_table(vectorized::AggregateDataPtr* places, + vectorized::ColumnRawPtrs& key_columns, size_t num_rows); + int _get_slot_column_id(const vectorized::AggFnEvaluator* evaluator); + void _emplace_into_hash_table(vectorized::AggregateDataPtr* places, + vectorized::ColumnRawPtrs& key_columns, const size_t num_rows); + Status _create_agg_status(vectorized::AggregateDataPtr data); + size_t _get_hash_table_size(); + + RuntimeProfile::Counter* _queue_byte_size_counter = nullptr; + RuntimeProfile::Counter* _queue_size_counter = nullptr; + RuntimeProfile::Counter* _streaming_agg_timer = nullptr; + RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; + RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; + RuntimeProfile::Counter* _hash_table_input_counter = nullptr; + RuntimeProfile::Counter* _build_timer = nullptr; + RuntimeProfile::Counter* _expr_timer = nullptr; + RuntimeProfile::Counter* _build_table_convert_timer = nullptr; + RuntimeProfile::Counter* _serialize_key_timer = nullptr; + RuntimeProfile::Counter* _merge_timer = nullptr; + RuntimeProfile::Counter* _serialize_data_timer = nullptr; + RuntimeProfile::Counter* _deserialize_data_timer = nullptr; + RuntimeProfile::Counter* _max_row_size_counter = nullptr; + RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; + RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; + RuntimeProfile::Counter* _hash_table_size_counter = nullptr; + RuntimeProfile::Counter* _get_results_timer = nullptr; + RuntimeProfile::Counter* _serialize_result_timer = nullptr; + RuntimeProfile::Counter* _hash_table_iterate_timer = nullptr; + RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr; + + bool _should_expand_hash_table = true; + int64_t _cur_num_rows_returned = 0; + std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr; + vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr; + std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators; + // group by k1,k2 + vectorized::VExprContextSPtrs _probe_expr_ctxs; + std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr; + std::unique_ptr<vectorized::AggregateDataContainer> _aggregate_data_container = nullptr; + bool _should_limit_output = false; + bool _reach_limit = false; + size_t _input_num_rows = 0; + + vectorized::PODArray<vectorized::AggregateDataPtr> _places; + std::vector<char> _deserialize_buffer; + + struct ExecutorBase { + virtual Status execute(StreamingAggLocalState* local_state, vectorized::Block* block) = 0; + virtual void update_memusage(StreamingAggLocalState* local_state) = 0; + virtual Status get_result(StreamingAggLocalState* local_state, RuntimeState* state, + vectorized::Block* block, SourceState& source_state) = 0; + virtual ~ExecutorBase() = default; + }; + template <bool WithoutKey, bool NeedToMerge, bool NeedFinalize> + struct Executor final : public ExecutorBase { + Status get_result(StreamingAggLocalState* local_state, RuntimeState* state, + vectorized::Block* block, SourceState& source_state) override { + if constexpr (WithoutKey) { + if constexpr (NeedFinalize) { + return local_state->_get_without_key_result(state, block, source_state); + } else { + return local_state->_serialize_without_key(state, block, source_state); + } + } else { + if constexpr (NeedFinalize) { + return local_state->_get_with_serialized_key_result(state, block, source_state); + } else { + return local_state->_serialize_with_serialized_key_result(state, block, + source_state); + } + } + } + + Status execute(StreamingAggLocalState* local_state, vectorized::Block* block) override { + if constexpr (WithoutKey) { + if constexpr (NeedToMerge) { + return local_state->_merge_without_key(block); + } else { + return local_state->_execute_without_key(block); + } + } else { + if constexpr (NeedToMerge) { + return local_state->_merge_with_serialized_key(block); + } else { + return local_state->_execute_with_serialized_key(block); + } + } + } + + void update_memusage(StreamingAggLocalState* local_state) override { + if constexpr (WithoutKey) { + local_state->_update_memusage_without_key(); + } else { + local_state->_update_memusage_with_serialized_key(); + } + } + }; + std::unique_ptr<ExecutorBase> _executor = nullptr; + + struct MemoryRecord { + MemoryRecord() : used_in_arena(0), used_in_state(0) {} + int64_t used_in_arena; + int64_t used_in_state; Review Comment: warning: use default member initializer for 'used_in_state' [modernize-use-default-member-init] be/src/pipeline/exec/streaming_aggregation_operator.h:178: ```diff - MemoryRecord() : used_in_arena(0), used_in_state(0) {} + MemoryRecord() : used_in_arena(0), {} ``` ```suggestion int64_t used_in_state{0}; ``` ########## be/src/pipeline/exec/streaming_aggregation_operator.cpp: ########## @@ -0,0 +1,1302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "streaming_aggregation_operator.h" + +#include <gen_cpp/Metrics_types.h> + +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "pipeline/exec/operator.h" + +namespace doris { +class RuntimeState; +} // namespace doris + +namespace doris::pipeline { +/// The minimum reduction factor (input rows divided by output rows) to grow hash tables +/// in a streaming preaggregation, given that the hash tables are currently the given +/// size or above. The sizes roughly correspond to hash table sizes where the bucket +/// arrays will fit in a cache level. Intuitively, we don't want the working set of the +/// aggregation to expand to the next level of cache unless we're reducing the input +/// enough to outweigh the increased memory latency we'll incur for each hash table +/// lookup. +/// +/// Note that the current reduction achieved is not always a good estimate of the +/// final reduction. It may be biased either way depending on the ordering of the +/// input. If the input order is random, we will underestimate the final reduction +/// factor because the probability of a row having the same key as a previous row +/// increases as more input is processed. If the input order is correlated with the +/// key, skew may bias the estimate. If high cardinality keys appear first, we +/// may overestimate and if low cardinality keys appear first, we underestimate. +/// To estimate the eventual reduction achieved, we estimate the final reduction +/// using the planner's estimated input cardinality and the assumption that input +/// is in a random order. This means that we assume that the reduction factor will +/// increase over time. +struct StreamingHtMinReductionEntry { + // Use 'streaming_ht_min_reduction' if the total size of hash table bucket directories in + // bytes is greater than this threshold. + int min_ht_mem; + // The minimum reduction factor to expand the hash tables. + double streaming_ht_min_reduction; +}; + +// TODO: experimentally tune these values and also programmatically get the cache size +// of the machine that we're running on. +static constexpr StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = { + // Expand up to L2 cache always. + {0, 0.0}, + // Expand into L3 cache if we look like we're getting some reduction. + // At present, The L2 cache is generally 1024k or more + {1024 * 1024, 1.1}, + // Expand into main memory if we're getting a significant reduction. + // The L3 cache is generally 16MB or more + {16 * 1024 * 1024, 2.0}, +}; + +static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = + sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); + +StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent), + _agg_arena_pool(std::make_unique<vectorized::Arena>()), + _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()), + _agg_profile_arena(std::make_unique<vectorized::Arena>()), + _child_block(vectorized::Block::create_unique()), + _child_source_state(SourceState::DEPEND_ON_SOURCE), + _pre_aggregated_block(vectorized::Block::create_unique()) {} + +Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_open_timer); + auto& p = Base::_parent->template cast<StreamingAggOperatorX>(); + for (auto& evaluator : p._aggregate_evaluators) { + _aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); + } + _probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); + for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i])); + } + _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", + TUnit::BYTES, "MemoryUsage", 1); + _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( + "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); + + _build_timer = ADD_TIMER(Base::profile(), "BuildTime"); + _build_table_convert_timer = ADD_TIMER(Base::profile(), "BuildConvertToPartitionedTime"); + _serialize_key_timer = ADD_TIMER(Base::profile(), "SerializeKeyTime"); + _exec_timer = ADD_TIMER(Base::profile(), "ExecTime"); + _merge_timer = ADD_TIMER(Base::profile(), "MergeTime"); + _expr_timer = ADD_TIMER(Base::profile(), "ExprTime"); + _serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime"); + _deserialize_data_timer = ADD_TIMER(Base::profile(), "DeserializeAndMergeTime"); + _hash_table_compute_timer = ADD_TIMER(Base::profile(), "HashTableComputeTime"); + _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); + _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); + _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT); + _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); + _queue_byte_size_counter = ADD_COUNTER(profile(), "MaxSizeInBlockQueue", TUnit::BYTES); + _queue_size_counter = ADD_COUNTER(profile(), "MaxSizeOfBlockQueue", TUnit::UNIT); + _streaming_agg_timer = ADD_TIMER(profile(), "StreamingAggTime"); + _build_timer = ADD_TIMER(profile(), "BuildTime"); + _expr_timer = ADD_TIMER(Base::profile(), "ExprTime"); + _get_results_timer = ADD_TIMER(profile(), "GetResultsTime"); + _serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime"); + _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime"); + _insert_keys_to_column_timer = ADD_TIMER(profile(), "InsertKeysToColumnTime"); + COUNTER_SET(_max_row_size_counter, (int64_t)0); + + for (auto& evaluator : _aggregate_evaluators) { + evaluator->set_timer(_merge_timer, _expr_timer); + } + + if (_probe_expr_ctxs.empty()) { + _agg_data->without_key = reinterpret_cast<vectorized::AggregateDataPtr>( + _agg_profile_arena->alloc(p._total_size_of_aggregate_states)); + + if (p._is_merge) { + if (p._needs_finalize) { + _executor = std::make_unique<Executor<true, true, true>>(); + } else { + _executor = std::make_unique<Executor<true, true, false>>(); + } + } else { + if (p._needs_finalize) { + _executor = std::make_unique<Executor<true, false, true>>(); + } else { + _executor = std::make_unique<Executor<true, false, false>>(); + } + } + } else { + _init_hash_method(_probe_expr_ctxs); + + std::visit( + [&](auto&& agg_method) { + using HashTableType = std::decay_t<decltype(agg_method)>; + using KeyType = typename HashTableType::Key; + + /// some aggregate functions (like AVG for decimal) have align issues. + _aggregate_data_container.reset(new vectorized::AggregateDataContainer( + sizeof(KeyType), + ((p._total_size_of_aggregate_states + p._align_aggregate_states - 1) / + p._align_aggregate_states) * + p._align_aggregate_states)); + }, + _agg_data->method_variant); + if (p._is_merge) { + if (p._needs_finalize) { + _executor = std::make_unique<Executor<false, true, true>>(); + } else { + _executor = std::make_unique<Executor<false, true, false>>(); + } + } else { + if (p._needs_finalize) { + _executor = std::make_unique<Executor<false, false, true>>(); + } else { + _executor = std::make_unique<Executor<false, false, false>>(); + } + } + + _should_limit_output = p._limit != -1 && // has limit + (!p._have_conjuncts) && // no having conjunct + p._needs_finalize; // agg's finalize step + } + return Status::OK(); +} + +void StreamingAggLocalState::_make_nullable_output_key(vectorized::Block* block) { + if (block->rows() != 0) { + for (auto cid : Base::_parent->cast<StreamingAggOperatorX>()._make_nullable_keys) { + block->get_by_position(cid).column = make_nullable(block->get_by_position(cid).column); + block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type); + } + } +} + +Status StreamingAggLocalState::_execute_without_key(vectorized::Block* block) { + DCHECK(_agg_data->without_key != nullptr); + SCOPED_TIMER(_build_timer); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add( + block, + _agg_data->without_key + Base::_parent->template cast<StreamingAggOperatorX>() + ._offsets_of_aggregate_states[i], + _agg_arena_pool.get())); + } + return Status::OK(); +} + +template <bool limit, bool for_spill> +Status StreamingAggLocalState::_merge_with_serialized_key_helper(vectorized::Block* block) { + SCOPED_TIMER(_merge_timer); + + size_t key_size = _probe_expr_ctxs.size(); + vectorized::ColumnRawPtrs key_columns(key_size); + + for (size_t i = 0; i < key_size; ++i) { + if constexpr (for_spill) { + key_columns[i] = block->get_by_position(i).column.get(); + } else { + int result_column_id = -1; + RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block, &result_column_id)); + block->replace_by_position_if_const(result_column_id); + key_columns[i] = block->get_by_position(result_column_id).column.get(); + } + } + + int rows = block->rows(); + if (_places.size() < rows) { + _places.resize(rows); + } + + if constexpr (limit) { + _find_in_hash_table(_places.data(), key_columns, rows); + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + if (_aggregate_evaluators[i]->is_merge()) { + int col_id = _get_slot_column_id(_aggregate_evaluators[i]); + auto column = block->get_by_position(col_id).column; + if (column->is_nullable()) { + column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr(); + } + + size_t buffer_size = _aggregate_evaluators[i]->function()->size_of_data() * rows; + if (_deserialize_buffer.size() < buffer_size) { + _deserialize_buffer.resize(buffer_size); + } + + { + SCOPED_TIMER(_deserialize_data_timer); + _aggregate_evaluators[i]->function()->deserialize_and_merge_vec_selected( + _places.data(), + Base::_parent->template cast<StreamingAggOperatorX>() + ._offsets_of_aggregate_states[i], + _deserialize_buffer.data(), (vectorized::ColumnString*)(column.get()), + _agg_arena_pool.get(), rows); + } + } else { + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected( + block, + Base::_parent->template cast<StreamingAggOperatorX>() + ._offsets_of_aggregate_states[i], + _places.data(), _agg_arena_pool.get())); + } + } + } else { + _emplace_into_hash_table(_places.data(), key_columns, rows); + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + if (_aggregate_evaluators[i]->is_merge() || for_spill) { + int col_id = 0; + if constexpr (for_spill) { + col_id = _probe_expr_ctxs.size() + i; + } else { + col_id = _get_slot_column_id(_aggregate_evaluators[i]); + } + auto column = block->get_by_position(col_id).column; + if (column->is_nullable()) { + column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr(); + } + + size_t buffer_size = _aggregate_evaluators[i]->function()->size_of_data() * rows; + if (_deserialize_buffer.size() < buffer_size) { + _deserialize_buffer.resize(buffer_size); + } + + { + SCOPED_TIMER(_deserialize_data_timer); + _aggregate_evaluators[i]->function()->deserialize_and_merge_vec( + _places.data(), + Base::_parent->template cast<StreamingAggOperatorX>() + ._offsets_of_aggregate_states[i], + _deserialize_buffer.data(), (vectorized::ColumnString*)(column.get()), + _agg_arena_pool.get(), rows); + } + } else { + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( + block, + Base::_parent->template cast<StreamingAggOperatorX>() + ._offsets_of_aggregate_states[i], + _places.data(), _agg_arena_pool.get())); + } + } + + if (_should_limit_output) { + _reach_limit = _get_hash_table_size() >= + Base::_parent->template cast<StreamingAggOperatorX>()._limit; + } + } + + return Status::OK(); +} + +size_t StreamingAggLocalState::_get_hash_table_size() { + return std::visit([&](auto&& agg_method) { return agg_method.hash_table->size(); }, + _agg_data->method_variant); +} + +Status StreamingAggLocalState::_merge_without_key(vectorized::Block* block) { + SCOPED_TIMER(_merge_timer); + DCHECK(_agg_data->without_key != nullptr); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + if (_aggregate_evaluators[i]->is_merge()) { + int col_id = _get_slot_column_id(_aggregate_evaluators[i]); + auto column = block->get_by_position(col_id).column; + if (column->is_nullable()) { + column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr(); + } + + SCOPED_TIMER(_deserialize_data_timer); + _aggregate_evaluators[i]->function()->deserialize_and_merge_from_column( + _agg_data->without_key + Base::_parent->template cast<StreamingAggOperatorX>() + ._offsets_of_aggregate_states[i], + *column, _agg_arena_pool.get()); + } else { + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add( + block, + _agg_data->without_key + Base::_parent->template cast<StreamingAggOperatorX>() + ._offsets_of_aggregate_states[i], + _agg_arena_pool.get())); + } + } + return Status::OK(); +} + +void StreamingAggLocalState::_update_memusage_without_key() { + auto arena_memory_usage = _agg_arena_pool->size() - _mem_usage_record.used_in_arena; + Base::_mem_tracker->consume(arena_memory_usage); + _serialize_key_arena_memory_usage->add(arena_memory_usage); + _mem_usage_record.used_in_arena = _agg_arena_pool->size(); +} + +Status StreamingAggLocalState::_execute_with_serialized_key(vectorized::Block* block) { + if (_reach_limit) { + return _execute_with_serialized_key_helper<true>(block); + } else { + return _execute_with_serialized_key_helper<false>(block); + } +} + +void StreamingAggLocalState::_update_memusage_with_serialized_key() { + std::visit( + [&](auto&& agg_method) -> void { + auto& data = *agg_method.hash_table; + auto arena_memory_usage = _agg_arena_pool->size() + + _aggregate_data_container->memory_usage() - + _mem_usage_record.used_in_arena; + Base::_mem_tracker->consume(arena_memory_usage); + Base::_mem_tracker->consume(data.get_buffer_size_in_bytes() - + _mem_usage_record.used_in_state); + _serialize_key_arena_memory_usage->add(arena_memory_usage); + COUNTER_UPDATE(_hash_table_memory_usage, + data.get_buffer_size_in_bytes() - _mem_usage_record.used_in_state); + _mem_usage_record.used_in_state = data.get_buffer_size_in_bytes(); + _mem_usage_record.used_in_arena = + _agg_arena_pool->size() + _aggregate_data_container->memory_usage(); + }, + _agg_data->method_variant); +} + +template <bool limit> +Status StreamingAggLocalState::_execute_with_serialized_key_helper(vectorized::Block* block) { + SCOPED_TIMER(_build_timer); + DCHECK(!_probe_expr_ctxs.empty()); + + size_t key_size = _probe_expr_ctxs.size(); + vectorized::ColumnRawPtrs key_columns(key_size); + { + SCOPED_TIMER(_expr_timer); + for (size_t i = 0; i < key_size; ++i) { + int result_column_id = -1; + RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block, &result_column_id)); + block->get_by_position(result_column_id).column = + block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + key_columns[i] = block->get_by_position(result_column_id).column.get(); + } + } + + int rows = block->rows(); + if (_places.size() < rows) { + _places.resize(rows); + } + + if constexpr (limit) { + _find_in_hash_table(_places.data(), key_columns, rows); + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected( + block, + Base::_parent->template cast<StreamingAggOperatorX>() + ._offsets_of_aggregate_states[i], + _places.data(), _agg_arena_pool.get())); + } + } else { + _emplace_into_hash_table(_places.data(), key_columns, rows); + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( + block, + Base::_parent->template cast<StreamingAggOperatorX>() + ._offsets_of_aggregate_states[i], + _places.data(), _agg_arena_pool.get())); + } + + if (_should_limit_output) { + _reach_limit = _get_hash_table_size() >= + Base::_parent->template cast<StreamingAggOperatorX>()._limit; + if (_reach_limit && + Base::_parent->template cast<StreamingAggOperatorX>()._can_short_circuit) { + Base::_dependency->set_ready_to_read(); + return Status::Error<ErrorCode::END_OF_FILE>(""); + } + } + } + + return Status::OK(); +} + +// We should call this function only at 1st phase. +// 1st phase: is_merge=true, only have one SlotRef. +// 2nd phase: is_merge=false, maybe have multiple exprs. +int StreamingAggLocalState::_get_slot_column_id(const vectorized::AggFnEvaluator* evaluator) { + auto ctxs = evaluator->input_exprs_ctxs(); + CHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref()) + << "input_exprs_ctxs is invalid, input_exprs_ctx[0]=" + << ctxs[0]->root()->debug_string(); + return ((vectorized::VSlotRef*)ctxs[0]->root().get())->column_id(); +} + +void StreamingAggLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places, + vectorized::ColumnRawPtrs& key_columns, + size_t num_rows) { + std::visit( + [&](auto&& agg_method) -> void { + using HashMethodType = std::decay_t<decltype(agg_method)>; + using AggState = typename HashMethodType::State; + AggState state(key_columns); + agg_method.init_serialized_keys(key_columns, num_rows); + + /// For all rows. + for (size_t i = 0; i < num_rows; ++i) { + auto find_result = agg_method.find(state, i); + + if (find_result.is_found()) { + places[i] = find_result.get_mapped(); + } else { + places[i] = nullptr; + } + } + }, + _agg_data->method_variant); +} + +Status StreamingAggLocalState::_merge_with_serialized_key(vectorized::Block* block) { + if (_reach_limit) { + return _merge_with_serialized_key_helper<true, false>(block); + } else { + return _merge_with_serialized_key_helper<false, false>(block); + } +} + +void StreamingAggLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs) { + DCHECK(probe_exprs.size() >= 1); + + using Type = vectorized::AggregatedDataVariants::Type; + Type t(Type::serialized); + + if (probe_exprs.size() == 1) { + auto is_nullable = probe_exprs[0]->root()->is_nullable(); + PrimitiveType type = probe_exprs[0]->root()->result_type(); + switch (type) { + case TYPE_TINYINT: + case TYPE_BOOLEAN: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_FLOAT: + case TYPE_DATEV2: + case TYPE_BIGINT: + case TYPE_DOUBLE: + case TYPE_DATE: + case TYPE_DATETIME: + case TYPE_DATETIMEV2: + case TYPE_LARGEINT: + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: { + size_t size = get_primitive_type_size(type); + if (size == 1) { + t = Type::int8_key; + } else if (size == 2) { + t = Type::int16_key; + } else if (size == 4) { + t = Type::int32_key; + } else if (size == 8) { + t = Type::int64_key; + } else if (size == 16) { + t = Type::int128_key; + } else { + throw Exception(ErrorCode::INTERNAL_ERROR, + "meet invalid type size, size={}, type={}", size, + type_to_string(type)); + } + break; + } + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: { + t = Type::string_key; + break; + } + default: + t = Type::serialized; + } + + _agg_data->init( + get_hash_key_type_with_phase( + t, !Base::_parent->template cast<StreamingAggOperatorX>()._is_first_phase), + is_nullable); + } else { + if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32, + vectorized::AggregateDataPtr>(_agg_data->method_variant, + probe_exprs)) { + _agg_data->init(Type::serialized); + } + } +} + +Status StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block, + vectorized::Block* output_block) { + RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block)); + + // pre stream agg need use _num_row_return to decide whether to do pre stream agg + _cur_num_rows_returned += output_block->rows(); + _make_nullable_output_key(output_block); + // COUNTER_SET(_rows_returned_counter, _num_rows_returned); + _executor->update_memusage(this); + return Status::OK(); +} + +bool StreamingAggLocalState::_should_expand_preagg_hash_tables() { + if (!_should_expand_hash_table) { + return false; + } + + return std::visit( + [&](auto&& agg_method) -> bool { + auto& hash_tbl = *agg_method.hash_table; + auto [ht_mem, ht_rows] = + std::pair {hash_tbl.get_buffer_size_in_bytes(), hash_tbl.size()}; + + // Need some rows in tables to have valid statistics. + if (ht_rows == 0) { + return true; + } + + // Find the appropriate reduction factor in our table for the current hash table sizes. + int cache_level = 0; + while (cache_level + 1 < STREAMING_HT_MIN_REDUCTION_SIZE && + ht_mem >= STREAMING_HT_MIN_REDUCTION[cache_level + 1].min_ht_mem) { + ++cache_level; + } + + // Compare the number of rows in the hash table with the number of input rows that + // were aggregated into it. Exclude passed through rows from this calculation since + // they were not in hash tables. + const int64_t input_rows = _input_num_rows; + const int64_t aggregated_input_rows = input_rows - _cur_num_rows_returned; + // TODO chenhao + // const int64_t expected_input_rows = estimated_input_cardinality_ - num_rows_returned_; + double current_reduction = static_cast<double>(aggregated_input_rows) / ht_rows; + + // TODO: workaround for IMPALA-2490: subplan node rows_returned counter may be + // inaccurate, which could lead to a divide by zero below. + if (aggregated_input_rows <= 0) { + return true; + } + + // Extrapolate the current reduction factor (r) using the formula + // R = 1 + (N / n) * (r - 1), where R is the reduction factor over the full input data + // set, N is the number of input rows, excluding passed-through rows, and n is the + // number of rows inserted or merged into the hash tables. This is a very rough + // approximation but is good enough to be useful. + // TODO: consider collecting more statistics to better estimate reduction. + // double estimated_reduction = aggregated_input_rows >= expected_input_rows + // ? current_reduction + // : 1 + (expected_input_rows / aggregated_input_rows) * (current_reduction - 1); + double min_reduction = + STREAMING_HT_MIN_REDUCTION[cache_level].streaming_ht_min_reduction; + + // COUNTER_SET(preagg_estimated_reduction_, estimated_reduction); + // COUNTER_SET(preagg_streaming_ht_min_reduction_, min_reduction); + // return estimated_reduction > min_reduction; + _should_expand_hash_table = current_reduction > min_reduction; + return _should_expand_hash_table; + }, + _agg_data->method_variant); +} + +size_t StreamingAggLocalState::_memory_usage() const { + size_t usage = 0; + if (_agg_arena_pool) { + usage += _agg_arena_pool->size(); + } + + if (_aggregate_data_container) { + usage += _aggregate_data_container->memory_usage(); + } + + return usage; +} + +Status StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::Block* in_block, + doris::vectorized::Block* out_block) { + SCOPED_TIMER(_build_timer); + DCHECK(!_probe_expr_ctxs.empty()); + + auto& p = Base::_parent->template cast<StreamingAggOperatorX>(); + + size_t key_size = _probe_expr_ctxs.size(); + vectorized::ColumnRawPtrs key_columns(key_size); + { + SCOPED_TIMER(_expr_timer); + for (size_t i = 0; i < key_size; ++i) { + int result_column_id = -1; + RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, &result_column_id)); + in_block->get_by_position(result_column_id).column = + in_block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + key_columns[i] = in_block->get_by_position(result_column_id).column.get(); + } + } + + int rows = in_block->rows(); + _places.resize(rows); + + // Stop expanding hash tables if we're not reducing the input sufficiently. As our + // hash tables expand out of each level of cache hierarchy, every hash table lookup + // will take longer. We also may not be able to expand hash tables because of memory + // pressure. In either case we should always use the remaining space in the hash table + // to avoid wasting memory. + // But for fixed hash map, it never need to expand + bool ret_flag = false; + RETURN_IF_ERROR(std::visit( + [&](auto&& agg_method) -> Status { + if (auto& hash_tbl = *agg_method.hash_table; + hash_tbl.add_elem_size_overflow(rows)) { + /// If too much memory is used during the pre-aggregation stage, + /// it is better to output the data directly without performing further aggregation. + const bool used_too_much_memory = + (_parent->cast<StreamingAggOperatorX>()._external_agg_bytes_threshold > + 0 && + _memory_usage() > _parent->cast<StreamingAggOperatorX>() + ._external_agg_bytes_threshold); + // do not try to do agg, just init and serialize directly return the out_block + if (!_should_expand_preagg_hash_tables() || used_too_much_memory) { + SCOPED_TIMER(_streaming_agg_timer); + ret_flag = true; + + // will serialize value data to string column. + // non-nullable column(id in `_make_nullable_keys`) + // will be converted to nullable. + bool mem_reuse = p._make_nullable_keys.empty() && out_block->mem_reuse(); + + std::vector<vectorized::DataTypePtr> data_types; + vectorized::MutableColumns value_columns; + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + auto data_type = + _aggregate_evaluators[i]->function()->get_serialized_type(); + if (mem_reuse) { + value_columns.emplace_back( + std::move(*out_block->get_by_position(i + key_size).column) + .mutate()); + } else { + // slot type of value it should always be string type + value_columns.emplace_back(_aggregate_evaluators[i] + ->function() + ->create_serialize_column()); + } + data_types.emplace_back(data_type); + } + + for (int i = 0; i != _aggregate_evaluators.size(); ++i) { + SCOPED_TIMER(_serialize_data_timer); + RETURN_IF_ERROR( + _aggregate_evaluators[i]->streaming_agg_serialize_to_column( + in_block, value_columns[i], rows, + _agg_arena_pool.get())); + } + + if (!mem_reuse) { + vectorized::ColumnsWithTypeAndName columns_with_schema; + for (int i = 0; i < key_size; ++i) { + columns_with_schema.emplace_back( + key_columns[i]->clone_resized(rows), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } + for (int i = 0; i < value_columns.size(); ++i) { + columns_with_schema.emplace_back(std::move(value_columns[i]), + data_types[i], ""); + } + out_block->swap(vectorized::Block(columns_with_schema)); + } else { + for (int i = 0; i < key_size; ++i) { + std::move(*out_block->get_by_position(i).column) + .mutate() + ->insert_range_from(*key_columns[i], 0, rows); + } + } + } + } + return Status::OK(); + }, + _agg_data->method_variant)); + + if (!ret_flag) { + RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(), key_columns, rows)); + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add( + in_block, p._offsets_of_aggregate_states[i], _places.data(), + _agg_arena_pool.get(), _should_expand_hash_table)); + } + } + + return Status::OK(); +} + +Status StreamingAggLocalState::_create_agg_status(vectorized::AggregateDataPtr data) { + auto& p = Base::_parent->template cast<StreamingAggOperatorX>(); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + try { + _aggregate_evaluators[i]->create(data + p._offsets_of_aggregate_states[i]); + } catch (...) { + for (int j = 0; j < i; ++j) { + _aggregate_evaluators[j]->destroy(data + p._offsets_of_aggregate_states[j]); + } + throw; + } + } + return Status::OK(); +} + +Status StreamingAggLocalState::_get_with_serialized_key_result(RuntimeState* state, + vectorized::Block* block, + SourceState& source_state) { + auto& p = _parent->cast<StreamingAggOperatorX>(); + // non-nullable column(id in `_make_nullable_keys`) will be converted to nullable. + bool mem_reuse = p._make_nullable_keys.empty() && block->mem_reuse(); + + auto columns_with_schema = + vectorized::VectorizedUtils::create_columns_with_type_and_name(p._row_descriptor); + int key_size = _probe_expr_ctxs.size(); + + vectorized::MutableColumns key_columns; + for (int i = 0; i < key_size; ++i) { + if (!mem_reuse) { + key_columns.emplace_back(columns_with_schema[i].type->create_column()); + } else { + key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); + } + } + vectorized::MutableColumns value_columns; + for (int i = key_size; i < columns_with_schema.size(); ++i) { + if (!mem_reuse) { + value_columns.emplace_back(columns_with_schema[i].type->create_column()); + } else { + value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); + } + } + + SCOPED_TIMER(_get_results_timer); + std::visit( + [&](auto&& agg_method) -> void { + auto& data = *agg_method.hash_table; + agg_method.init_iterator(); + const auto size = std::min(data.size(), size_t(state->batch_size())); + using KeyType = std::decay_t<decltype(agg_method.iterator->get_first())>; + std::vector<KeyType> keys(size); + if (_values.size() < size) { + _values.resize(size); + } + + size_t num_rows = 0; + _aggregate_data_container->init_once(); + auto& iter = _aggregate_data_container->iterator; + + { + SCOPED_TIMER(_hash_table_iterate_timer); + while (iter != _aggregate_data_container->end() && + num_rows < state->batch_size()) { + keys[num_rows] = iter.template get_key<KeyType>(); + _values[num_rows] = iter.get_aggregate_data(); + ++iter; + ++num_rows; + } + } + + { + SCOPED_TIMER(_insert_keys_to_column_timer); + agg_method.insert_keys_into_columns(keys, key_columns, num_rows); + } + + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->insert_result_info_vec( + _values, p._offsets_of_aggregate_states[i], value_columns[i].get(), + num_rows); + } + + if (iter == _aggregate_data_container->end()) { + if (agg_method.hash_table->has_null_key_data()) { + // only one key of group by support wrap null key + // here need additional processing logic on the null key / value + DCHECK(key_columns.size() == 1); + DCHECK(key_columns[0]->is_nullable()); + if (key_columns[0]->size() < state->batch_size()) { + key_columns[0]->insert_data(nullptr, 0); + auto mapped = agg_method.hash_table->template get_null_key_data< + vectorized::AggregateDataPtr>(); + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) + _aggregate_evaluators[i]->insert_result_info( + mapped + p._offsets_of_aggregate_states[i], + value_columns[i].get()); + source_state = SourceState::FINISHED; + } + } else { + source_state = SourceState::FINISHED; + } + } + }, + _agg_data->method_variant); + + if (!mem_reuse) { + *block = columns_with_schema; + vectorized::MutableColumns columns(block->columns()); + for (int i = 0; i < block->columns(); ++i) { + if (i < key_size) { + columns[i] = std::move(key_columns[i]); + } else { + columns[i] = std::move(value_columns[i - key_size]); + } + } + block->set_columns(std::move(columns)); + } + + return Status::OK(); +} + +Status StreamingAggLocalState::_serialize_without_key(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + // 1. `child(0)->rows_returned() == 0` mean not data from child + // in level two aggregation node should return NULL result + // level one aggregation node set `eos = true` return directly + SCOPED_TIMER(_serialize_result_timer); + if (UNLIKELY(_input_num_rows == 0)) { + source_state = SourceState::FINISHED; + return Status::OK(); + } + block->clear(); + + DCHECK(_agg_data->without_key != nullptr); + int agg_size = _aggregate_evaluators.size(); + + vectorized::MutableColumns value_columns(agg_size); + std::vector<vectorized::DataTypePtr> data_types(agg_size); + // will serialize data to string column + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + data_types[i] = _aggregate_evaluators[i]->function()->get_serialized_type(); + value_columns[i] = _aggregate_evaluators[i]->function()->create_serialize_column(); + } + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->serialize_without_key_to_column( + _agg_data->without_key + + _parent->cast<StreamingAggOperatorX>()._offsets_of_aggregate_states[i], + *value_columns[i]); + } + + { + vectorized::ColumnsWithTypeAndName data_with_schema; + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + vectorized::ColumnWithTypeAndName column_with_schema = {nullptr, data_types[i], ""}; + data_with_schema.push_back(std::move(column_with_schema)); + } + *block = vectorized::Block(data_with_schema); + } + + block->set_columns(std::move(value_columns)); + source_state = SourceState::FINISHED; + return Status::OK(); +} + +Status StreamingAggLocalState::_serialize_with_serialized_key_result(RuntimeState* state, + vectorized::Block* block, + SourceState& source_state) { + SCOPED_TIMER(_serialize_result_timer); + auto& p = _parent->cast<StreamingAggOperatorX>(); + int key_size = _probe_expr_ctxs.size(); + int agg_size = _aggregate_evaluators.size(); + vectorized::MutableColumns value_columns(agg_size); + vectorized::DataTypes value_data_types(agg_size); + + // non-nullable column(id in `_make_nullable_keys`) will be converted to nullable. + bool mem_reuse = p._make_nullable_keys.empty() && block->mem_reuse(); + + vectorized::MutableColumns key_columns; + for (int i = 0; i < key_size; ++i) { + if (mem_reuse) { + key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); + } else { + key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column()); + } + } + + SCOPED_TIMER(_get_results_timer); + std::visit( + [&](auto&& agg_method) -> void { + agg_method.init_iterator(); + auto& data = *agg_method.hash_table; + const auto size = std::min(data.size(), size_t(state->batch_size())); + using KeyType = std::decay_t<decltype(agg_method.iterator->get_first())>; + std::vector<KeyType> keys(size); + if (_values.size() < size + 1) { + _values.resize(size + 1); + } + + size_t num_rows = 0; + _aggregate_data_container->init_once(); + auto& iter = _aggregate_data_container->iterator; + + { + SCOPED_TIMER(_hash_table_iterate_timer); + while (iter != _aggregate_data_container->end() && + num_rows < state->batch_size()) { + keys[num_rows] = iter.template get_key<KeyType>(); + _values[num_rows] = iter.get_aggregate_data(); + ++iter; + ++num_rows; + } + } + + { + SCOPED_TIMER(_insert_keys_to_column_timer); + agg_method.insert_keys_into_columns(keys, key_columns, num_rows); + } + + if (iter == _aggregate_data_container->end()) { + if (agg_method.hash_table->has_null_key_data()) { + // only one key of group by support wrap null key + // here need additional processing logic on the null key / value + DCHECK(key_columns.size() == 1); + DCHECK(key_columns[0]->is_nullable()); + if (agg_method.hash_table->has_null_key_data()) { + key_columns[0]->insert_data(nullptr, 0); + _values[num_rows] = agg_method.hash_table->template get_null_key_data< + vectorized::AggregateDataPtr>(); + ++num_rows; + source_state = SourceState::FINISHED; + } + } else { + source_state = SourceState::FINISHED; + } + } + + { + SCOPED_TIMER(_serialize_data_timer); + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + value_data_types[i] = + _aggregate_evaluators[i]->function()->get_serialized_type(); + if (mem_reuse) { + value_columns[i] = + std::move(*block->get_by_position(i + key_size).column) + .mutate(); + } else { + value_columns[i] = + _aggregate_evaluators[i]->function()->create_serialize_column(); + } + _aggregate_evaluators[i]->function()->serialize_to_column( + _values, p._offsets_of_aggregate_states[i], value_columns[i], + num_rows); + } + } + }, + _agg_data->method_variant); + + if (!mem_reuse) { + vectorized::ColumnsWithTypeAndName columns_with_schema; + for (int i = 0; i < key_size; ++i) { + columns_with_schema.emplace_back(std::move(key_columns[i]), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } + for (int i = 0; i < agg_size; ++i) { + columns_with_schema.emplace_back(std::move(value_columns[i]), value_data_types[i], ""); + } + *block = vectorized::Block(columns_with_schema); + } + + return Status::OK(); +} + +void StreamingAggLocalState::make_nullable_output_key(vectorized::Block* block) { + if (block->rows() != 0) { + for (auto cid : _parent->cast<StreamingAggOperatorX>()._make_nullable_keys) { + block->get_by_position(cid).column = make_nullable(block->get_by_position(cid).column); + block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type); + } + } +} + +Status StreamingAggLocalState::_get_without_key_result(RuntimeState* state, + vectorized::Block* block, + SourceState& source_state) { + DCHECK(_agg_data->without_key != nullptr); + block->clear(); + + auto& p = _parent->cast<StreamingAggOperatorX>(); + *block = vectorized::VectorizedUtils::create_empty_columnswithtypename(p._row_descriptor); + int agg_size = _aggregate_evaluators.size(); + + vectorized::MutableColumns columns(agg_size); + std::vector<vectorized::DataTypePtr> data_types(agg_size); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + data_types[i] = _aggregate_evaluators[i]->function()->get_return_type(); + columns[i] = data_types[i]->create_column(); + } + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + auto column = columns[i].get(); + _aggregate_evaluators[i]->insert_result_info( + _agg_data->without_key + p._offsets_of_aggregate_states[i], column); + } + + const auto& block_schema = block->get_columns_with_type_and_name(); + DCHECK_EQ(block_schema.size(), columns.size()); + for (int i = 0; i < block_schema.size(); ++i) { + const auto column_type = block_schema[i].type; + if (!column_type->equals(*data_types[i])) { + if (!vectorized::is_array(remove_nullable(column_type))) { + if (!column_type->is_nullable() || data_types[i]->is_nullable() || + !remove_nullable(column_type)->equals(*data_types[i])) { + return Status::InternalError( + "node id = {}, column_type not match data_types, column_type={}, " + "data_types={}", + _parent->node_id(), column_type->get_name(), data_types[i]->get_name()); + } + } + + if (column_type->is_nullable() && !data_types[i]->is_nullable()) { + vectorized::ColumnPtr ptr = std::move(columns[i]); + // unless `count`, other aggregate function dispose empty set should be null + // so here check the children row return + ptr = make_nullable(ptr, _input_num_rows == 0); + columns[i] = ptr->assume_mutable(); + } + } + } + + block->set_columns(std::move(columns)); + source_state = SourceState::FINISHED; + return Status::OK(); +} + +void StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* places, + vectorized::ColumnRawPtrs& key_columns, + const size_t num_rows) { + std::visit( + [&](auto&& agg_method) -> void { + SCOPED_TIMER(_hash_table_compute_timer); + using HashMethodType = std::decay_t<decltype(agg_method)>; + using AggState = typename HashMethodType::State; + AggState state(key_columns); + agg_method.init_serialized_keys(key_columns, num_rows); + + auto creator = [this](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key_and_origin(key, origin, *_agg_arena_pool); + auto mapped = _aggregate_data_container->append_data(origin); + auto st = _create_agg_status(mapped); + if (!st) { + throw Exception(st.code(), st.to_string()); + } + ctor(key, mapped); + }; + + auto creator_for_null_key = [&](auto& mapped) { + mapped = _agg_arena_pool->aligned_alloc( + Base::_parent->template cast<StreamingAggOperatorX>() + ._total_size_of_aggregate_states, + Base::_parent->template cast<StreamingAggOperatorX>() + ._align_aggregate_states); + auto st = _create_agg_status(mapped); + if (!st) { + throw Exception(st.code(), st.to_string()); + } + }; + + SCOPED_TIMER(_hash_table_emplace_timer); + for (size_t i = 0; i < num_rows; ++i) { + places[i] = agg_method.lazy_emplace(state, i, creator, creator_for_null_key); + } + + COUNTER_UPDATE(_hash_table_input_counter, num_rows); + }, + _agg_data->method_variant); +} + +StreamingAggOperatorX::StreamingAggOperatorX(ObjectPool* pool, int operator_id, + const TPlanNode& tnode, const DescriptorTbl& descs) + : StatefulOperatorX<StreamingAggLocalState>(pool, tnode, operator_id, descs), + _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), + _intermediate_tuple_desc(nullptr), + _output_tuple_id(tnode.agg_node.output_tuple_id), + _output_tuple_desc(nullptr), + _needs_finalize(tnode.agg_node.need_finalize), + _is_merge(false), + _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), + _have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) {} + +Status StreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::init(tnode, state)); + // ignore return status for now , so we need to introduce ExecNode::init() + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_trees(tnode.agg_node.grouping_exprs, _probe_expr_ctxs)); + + // init aggregate functions + _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size()); + // In case of : `select * from (select GoodEvent from hits union select CounterID from hits) as h limit 10;` + // only union with limit: we can short circuit query the pipeline exec engine. + _can_short_circuit = + tnode.agg_node.aggregate_functions.empty() && state->enable_pipeline_exec(); + + TSortInfo dummy; + for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) { + vectorized::AggFnEvaluator* evaluator = nullptr; + RETURN_IF_ERROR(vectorized::AggFnEvaluator::create( + _pool, tnode.agg_node.aggregate_functions[i], + tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy, + &evaluator)); + _aggregate_evaluators.push_back(evaluator); + } + + const auto& agg_functions = tnode.agg_node.aggregate_functions; + _external_agg_bytes_threshold = state->external_agg_bytes_threshold(); + + if (_external_agg_bytes_threshold > 0) { + _spill_partition_count_bits = 4; + if (state->query_options().__isset.external_agg_partition_bits) { + _spill_partition_count_bits = state->query_options().external_agg_partition_bits; + } + } + + _is_merge = std::any_of(agg_functions.cbegin(), agg_functions.cend(), + [](const auto& e) { return e.nodes[0].agg_expr.is_merge_agg; }); + _op_name = "STREAMING_AGGREGATION_OPERATOR"; + return Status::OK(); +} + +Status StreamingAggOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::prepare(state)); + _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); + _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc())); + + int j = _probe_expr_ctxs.size(); + for (int i = 0; i < j; ++i) { + auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable(); + auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable(); + if (nullable_output != nullable_input) { + DCHECK(nullable_output); + _make_nullable_keys.emplace_back(i); + } + } + for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) { + SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; + SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; + RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( + state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc)); + } + + _offsets_of_aggregate_states.resize(_aggregate_evaluators.size()); + + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states; + + const auto& agg_function = _aggregate_evaluators[i]->function(); + // aggreate states are aligned based on maximum requirement + _align_aggregate_states = std::max(_align_aggregate_states, agg_function->align_of_data()); + _total_size_of_aggregate_states += agg_function->size_of_data(); + + // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. + if (i + 1 < _aggregate_evaluators.size()) { + size_t alignment_of_next_state = + _aggregate_evaluators[i + 1]->function()->align_of_data(); + if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 0) { + return Status::RuntimeError("Logical error: align_of_data is not 2^N"); + } + + /// Extend total_size to next alignment requirement + /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. + _total_size_of_aggregate_states = + (_total_size_of_aggregate_states + alignment_of_next_state - 1) / + alignment_of_next_state * alignment_of_next_state; + } + } + + return Status::OK(); +} + +Status StreamingAggOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::open(state)); + RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state)); + _aggregate_evaluators[i]->set_version(state->be_exec_version()); + } + + return Status::OK(); +} + +Status StreamingAggLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_close_timer); + if (Base::_closed) { + return Status::OK(); + } + _pre_aggregated_block->clear(); + vectorized::PODArray<vectorized::AggregateDataPtr> tmp_places; + _places.swap(tmp_places); + + std::vector<char> tmp_deserialize_buffer; + _deserialize_buffer.swap(tmp_deserialize_buffer); + Base::_mem_tracker->release(_mem_usage_record.used_in_state + _mem_usage_record.used_in_arena); + + /// _hash_table_size_counter may be null if prepare failed. + if (_hash_table_size_counter) { + std::visit( + [&](auto&& agg_method) { + COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.hash_table->size())); + }, + _agg_data->method_variant); + } + return Base::close(state); +} + +Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) const { + auto& local_state = get_local_state(state); + if (!local_state._pre_aggregated_block->empty()) { + local_state._pre_aggregated_block->swap(*block); + } else { + RETURN_IF_ERROR( + local_state._executor->get_result(&local_state, state, block, source_state)); + local_state.make_nullable_output_key(block); + // dispose the having clause, should not be execute in prestreaming agg + RETURN_IF_ERROR( + vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + } + local_state.reached_limit(block, source_state); + + return Status::OK(); +} + +Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) const { + auto& local_state = get_local_state(state); + local_state._input_num_rows += in_block->rows(); + if (in_block->rows() > 0) { + RETURN_IF_ERROR(local_state.do_pre_agg(in_block, local_state._pre_aggregated_block.get())); + } + in_block->clear_column_data(_child_x->row_desc().num_materialized_slots()); + return Status::OK(); +} + +bool StreamingAggOperatorX::need_more_input_data(RuntimeState* state) const { Review Comment: warning: method 'need_more_input_data' can be made static [readability-convert-member-functions-to-static] ```suggestion bool StreamingAggOperatorX::need_more_input_data(RuntimeState* state) { ``` be/src/pipeline/exec/streaming_aggregation_operator.h:201: ```diff - bool need_more_input_data(RuntimeState* state) const override; + static bool need_more_input_data(RuntimeState* state) override; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org