This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1c6246f7ee [improve](agg) support distinct agg node (#22169) 1c6246f7ee is described below commit 1c6246f7ee922c884d32d5908664b8bb9a0e029e Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Fri Jul 28 13:54:10 2023 +0800 [improve](agg) support distinct agg node (#22169) select c_name from customer union select c_name from customer this sql used agg node to get distinct row of c_name, so it's no need to wait for inserted all data to hash map, could output the data which it's inserted into hash map successed. --- be/src/exec/exec_node.cpp | 7 +- ...istinct_streaming_aggregation_sink_operator.cpp | 97 +++++++++++++++ .../distinct_streaming_aggregation_sink_operator.h | 76 ++++++++++++ ...tinct_streaming_aggregation_source_operator.cpp | 92 ++++++++++++++ ...istinct_streaming_aggregation_source_operator.h | 67 ++++++++++ be/src/pipeline/pipeline_fragment_context.cpp | 17 ++- be/src/vec/exec/distinct_vaggregation_node.cpp | 136 +++++++++++++++++++++ be/src/vec/exec/distinct_vaggregation_node.h | 55 +++++++++ be/src/vec/exec/vaggregation_node.cpp | 25 ++-- be/src/vec/exec/vaggregation_node.h | 49 ++++---- .../expressions/functions/agg/WindowFunnel.java | 8 +- 11 files changed, 588 insertions(+), 41 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index c6b7826deb..bcb0771eda 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -44,6 +44,7 @@ #include "util/uid_util.h" #include "vec/columns/column_nullable.h" #include "vec/core/block.h" +#include "vec/exec/distinct_vaggregation_node.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/join/vnested_loop_join_node.h" #include "vec/exec/scan/new_es_scan_node.h" @@ -371,7 +372,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::AGGREGATION_NODE: - *node = pool->add(new vectorized::AggregationNode(pool, tnode, descs)); + if (tnode.agg_node.aggregate_functions.empty() && state->enable_pipeline_exec()) { + *node = pool->add(new vectorized::DistinctAggregationNode(pool, tnode, descs)); + } else { + *node = pool->add(new vectorized::AggregationNode(pool, tnode, descs)); + } return Status::OK(); case TPlanNodeType::HASH_JOIN_NODE: diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp new file mode 100644 index 0000000000..48695ed56f --- /dev/null +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "distinct_streaming_aggregation_sink_operator.h" + +#include <gen_cpp/Metrics_types.h> + +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "pipeline/exec/data_queue.h" +#include "pipeline/exec/operator.h" +#include "vec/exec/distinct_vaggregation_node.h" +#include "vec/exec/vaggregation_node.h" + +namespace doris { +class ExecNode; +class RuntimeState; +} // namespace doris + +namespace doris::pipeline { + +DistinctStreamingAggSinkOperator::DistinctStreamingAggSinkOperator( + OperatorBuilderBase* operator_builder, ExecNode* agg_node, std::shared_ptr<DataQueue> queue) + : StreamingOperator(operator_builder, agg_node), _data_queue(std::move(queue)) {} + +bool DistinctStreamingAggSinkOperator::can_write() { + // sink and source in diff threads + return _data_queue->has_enough_space_to_push(); +} + +Status DistinctStreamingAggSinkOperator::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) { + if (in_block && in_block->rows() > 0) { + if (_output_block == nullptr) { + _output_block = _data_queue->get_free_block(); + } + RETURN_IF_ERROR( + _node->_distinct_pre_agg_with_serialized_key(in_block, _output_block.get())); + + // get enough data or reached limit rows, need push block to queue + if (_node->limit() != -1 && + (_output_block->rows() + _output_distinct_rows) >= _node->limit()) { + auto limit_rows = _node->limit() - _output_block->rows(); + _output_block->set_num_rows(limit_rows); + _output_distinct_rows += limit_rows; + _data_queue->push_block(std::move(_output_block)); + } else if (_output_block->rows() >= state->batch_size()) { + _output_distinct_rows += _output_block->rows(); + _data_queue->push_block(std::move(_output_block)); + } + } + + // reach limit or source finish + if ((UNLIKELY(source_state == SourceState::FINISHED)) || reached_limited_rows()) { + if (_output_block != nullptr) { //maybe the last block with eos + _output_distinct_rows += _output_block->rows(); + _data_queue->push_block(std::move(_output_block)); + } + _data_queue->set_finish(); + return Status::Error<ErrorCode::END_OF_FILE>(""); + } + return Status::OK(); +} + +Status DistinctStreamingAggSinkOperator::close(RuntimeState* state) { + if (_data_queue && !_data_queue->is_finish()) { + // finish should be set, if not set here means error. + _data_queue->set_canceled(); + } + return StreamingOperator::close(state); +} + +DistinctStreamingAggSinkOperatorBuilder::DistinctStreamingAggSinkOperatorBuilder( + int32_t id, ExecNode* exec_node, std::shared_ptr<DataQueue> queue) + : OperatorBuilder(id, "DistinctStreamingAggSinkOperator", exec_node), + _data_queue(std::move(queue)) {} + +OperatorPtr DistinctStreamingAggSinkOperatorBuilder::build_operator() { + return std::make_shared<DistinctStreamingAggSinkOperator>(this, _node, _data_queue); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h new file mode 100644 index 0000000000..ae7106178e --- /dev/null +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <cstdint> +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/exec/distinct_vaggregation_node.h" +#include "vec/exec/vaggregation_node.h" + +namespace doris { +class ExecNode; +class RuntimeState; + +namespace pipeline { +class DataQueue; + +class DistinctStreamingAggSinkOperatorBuilder final + : public OperatorBuilder<vectorized::DistinctAggregationNode> { +public: + DistinctStreamingAggSinkOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<DataQueue>); + + OperatorPtr build_operator() override; + + bool is_sink() const override { return true; } + bool is_source() const override { return false; } + +private: + std::shared_ptr<DataQueue> _data_queue; +}; + +class DistinctStreamingAggSinkOperator final + : public StreamingOperator<DistinctStreamingAggSinkOperatorBuilder> { +public: + DistinctStreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode*, + std::shared_ptr<DataQueue>); + + Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override; + + bool can_write() override; + + Status close(RuntimeState* state) override; + + bool reached_limited_rows() { + return _node->limit() != -1 && _output_distinct_rows > _node->limit(); + } + +private: + int64_t _output_distinct_rows = 0; + std::shared_ptr<DataQueue> _data_queue; + std::unique_ptr<vectorized::Block> _output_block = vectorized::Block::create_unique(); +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp new file mode 100644 index 0000000000..f91fd3fbe3 --- /dev/null +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "distinct_streaming_aggregation_source_operator.h" + +#include <utility> + +#include "pipeline/exec/data_queue.h" +#include "pipeline/exec/operator.h" +#include "runtime/descriptors.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/exec/distinct_vaggregation_node.h" +#include "vec/exec/vaggregation_node.h" + +namespace doris { +class ExecNode; +class RuntimeState; + +namespace pipeline { +DistinctStreamingAggSourceOperator::DistinctStreamingAggSourceOperator( + OperatorBuilderBase* templ, ExecNode* node, std::shared_ptr<DataQueue> queue) + : SourceOperator(templ, node), _data_queue(std::move(queue)) {} + +bool DistinctStreamingAggSourceOperator::can_read() { + return _data_queue->has_data_or_finished(); +} + +Status DistinctStreamingAggSourceOperator::pull_data(RuntimeState* state, vectorized::Block* block, + bool* eos) { + std::unique_ptr<vectorized::Block> agg_block; + RETURN_IF_ERROR(_data_queue->get_block_from_queue(&agg_block)); + if (agg_block != nullptr) { + block->swap(*agg_block); + agg_block->clear_column_data(_node->row_desc().num_materialized_slots()); + _data_queue->push_free_block(std::move(agg_block)); + } + if (_data_queue->data_exhausted()) { //the sink is eos or reached limit + *eos = true; + } + _node->_make_nullable_output_key(block); + if (_node->is_streaming_preagg() == false) { + // dispose the having clause, should not be execute in prestreaming agg + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_node->get_conjuncts(), block, + block->columns())); + } + + rows_have_returned += block->rows(); + return Status::OK(); +} + +Status DistinctStreamingAggSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + bool eos = false; + RETURN_IF_ERROR(_node->get_next_after_projects( + state, block, &eos, + std::bind(&DistinctStreamingAggSourceOperator::pull_data, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3))); + if (UNLIKELY(eos)) { + _node->set_num_rows_returned(rows_have_returned); + source_state = SourceState::FINISHED; + } else { + source_state = SourceState::DEPEND_ON_SOURCE; + } + return Status::OK(); +} + +DistinctStreamingAggSourceOperatorBuilder::DistinctStreamingAggSourceOperatorBuilder( + int32_t id, ExecNode* exec_node, std::shared_ptr<DataQueue> queue) + : OperatorBuilder(id, "DistinctStreamingAggSourceOperator", exec_node), + _data_queue(std::move(queue)) {} + +OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() { + return std::make_shared<DistinctStreamingAggSourceOperator>(this, _node, _data_queue); +} + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h new file mode 100644 index 0000000000..3534193bf8 --- /dev/null +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <stdint.h> + +#include <cstdint> +#include <memory> + +#include "common/status.h" +#include "operator.h" +#include "vec/exec/distinct_vaggregation_node.h" +#include "vec/exec/vaggregation_node.h" + +namespace doris { +class ExecNode; +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized +namespace pipeline { +class DataQueue; + +class DistinctStreamingAggSourceOperatorBuilder final + : public OperatorBuilder<vectorized::DistinctAggregationNode> { +public: + DistinctStreamingAggSourceOperatorBuilder(int32_t, ExecNode*, std::shared_ptr<DataQueue>); + + bool is_source() const override { return true; } + + OperatorPtr build_operator() override; + +private: + std::shared_ptr<DataQueue> _data_queue; +}; + +class DistinctStreamingAggSourceOperator final + : public SourceOperator<DistinctStreamingAggSourceOperatorBuilder> { +public: + DistinctStreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, std::shared_ptr<DataQueue>); + bool can_read() override; + Status get_block(RuntimeState*, vectorized::Block*, SourceState& source_state) override; + Status open(RuntimeState*) override { return Status::OK(); } + Status pull_data(RuntimeState* state, vectorized::Block* output_block, bool* eos); + +private: + int64_t rows_have_returned = 0; + std::shared_ptr<DataQueue> _data_queue; +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 3c4957ad40..fe5e98dde8 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -48,6 +48,8 @@ #include "pipeline/exec/const_value_operator.h" #include "pipeline/exec/data_queue.h" #include "pipeline/exec/datagen_operator.h" +#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h" +#include "pipeline/exec/distinct_streaming_aggregation_source_operator.h" #include "pipeline/exec/empty_set_operator.h" #include "pipeline/exec/empty_source_operator.h" #include "pipeline/exec/exchange_sink_operator.h" @@ -503,10 +505,21 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur break; } case TPlanNodeType::AGGREGATION_NODE: { - auto* agg_node = assert_cast<vectorized::AggregationNode*>(node); + auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node); auto new_pipe = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe)); - if (agg_node->is_streaming_preagg()) { + if (agg_node->is_aggregate_evaluators_empty()) { + auto data_queue = std::make_shared<DataQueue>(1); + OperatorBuilderPtr pre_agg_sink = + std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node, + data_queue); + RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink)); + + OperatorBuilderPtr pre_agg_source = + std::make_shared<DistinctStreamingAggSourceOperatorBuilder>( + node->id(), agg_node, data_queue); + RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source)); + } else if (agg_node->is_streaming_preagg()) { auto data_queue = std::make_shared<DataQueue>(1); OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>( node->id(), agg_node, data_queue); diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp b/be/src/vec/exec/distinct_vaggregation_node.cpp new file mode 100644 index 0000000000..bbbd196411 --- /dev/null +++ b/be/src/vec/exec/distinct_vaggregation_node.cpp @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/distinct_vaggregation_node.h" + +#include "runtime/runtime_state.h" +#include "vec/aggregate_functions/aggregate_function_uniq.h" +#include "vec/exec/vaggregation_node.h" + +namespace doris { +class ObjectPool; +} // namespace doris + +namespace doris::vectorized { + +DistinctAggregationNode::DistinctAggregationNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : AggregationNode(pool, tnode, descs) { + dummy_mapped_data = pool->add(new char('A')); +} + +Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key( + doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) { + SCOPED_TIMER(_build_timer); + DCHECK(!_probe_expr_ctxs.empty()); + + size_t key_size = _probe_expr_ctxs.size(); + ColumnRawPtrs key_columns(key_size); + { + SCOPED_TIMER(_expr_timer); + for (size_t i = 0; i < key_size; ++i) { + int result_column_id = -1; + RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, &result_column_id)); + in_block->get_by_position(result_column_id).column = + in_block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + key_columns[i] = in_block->get_by_position(result_column_id).column.get(); + } + } + + int rows = in_block->rows(); + IColumn::Selector distinct_row; + distinct_row.reserve(rows); + + RETURN_IF_CATCH_EXCEPTION( + _emplace_into_hash_table_to_distinct(distinct_row, key_columns, rows)); + + bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse(); + if (mem_reuse) { + for (int i = 0; i < key_size; ++i) { + auto dst = out_block->get_by_position(i).column->assume_mutable(); + key_columns[i]->append_data_by_selector(dst, distinct_row); + } + } else { + ColumnsWithTypeAndName columns_with_schema; + for (int i = 0; i < key_size; ++i) { + auto distinct_column = key_columns[i]->clone_empty(); + key_columns[i]->append_data_by_selector(distinct_column, distinct_row); + columns_with_schema.emplace_back(std::move(distinct_column), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } + out_block->swap(Block(columns_with_schema)); + } + return Status::OK(); +} + +void DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row, + ColumnRawPtrs& key_columns, + const size_t num_rows) { + std::visit( + [&](auto&& agg_method) -> void { + SCOPED_TIMER(_hash_table_compute_timer); + using HashMethodType = std::decay_t<decltype(agg_method)>; + using HashTableType = std::decay_t<decltype(agg_method.data)>; + using AggState = typename HashMethodType::State; + AggState state(key_columns, _probe_key_sz, nullptr); + _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); + + if constexpr (HashTableTraits<HashTableType>::is_phmap) { + if (_hash_values.size() < num_rows) { + _hash_values.resize(num_rows); + } + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + AggState>::value) { + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } else { + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = + agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool)); + } + } + } + + /// For all rows. + COUNTER_UPDATE(_hash_table_input_counter, num_rows); + for (size_t i = 0; i < num_rows; ++i) { + auto emplace_result = [&]() { + if constexpr (HashTableTraits<HashTableType>::is_phmap) { + if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) { + agg_method.data.prefetch_by_hash( + _hash_values[i + HASH_MAP_PREFETCH_DIST]); + } + return state.emplace_key(agg_method.data, _hash_values[i], i, + *_agg_arena_pool); + } else { + return state.emplace_key(agg_method.data, i, *_agg_arena_pool); + } + }(); + + if (emplace_result.is_inserted()) { + emplace_result.set_mapped(dummy_mapped_data); + distinct_row.push_back(i); + } + } + }, + _agg_data->_aggregated_method_variant); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/distinct_vaggregation_node.h b/be/src/vec/exec/distinct_vaggregation_node.h new file mode 100644 index 0000000000..f5ca0ceebb --- /dev/null +++ b/be/src/vec/exec/distinct_vaggregation_node.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <cstdint> +#include <memory> + +#include "vec/exec/vaggregation_node.h" +#include "vec/exprs/vectorized_agg_fn.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/exprs/vslot_ref.h" + +namespace doris { +class TPlanNode; +class DescriptorTbl; +class ObjectPool; +class RuntimeState; + +namespace vectorized { + +// select c_name from customer union select c_name from customer +// this sql used agg node to get distinct row of c_name, +// so it's could output data when it's inserted into hashmap. +// phase1: (_is_merge:false, _needs_finalize:false, Streaming Preaggregation:true, agg size:0, limit:-1) +// phase2: (_is_merge:false, _needs_finalize:true, Streaming Preaggregation:false,agg size:0, limit:-1) +class DistinctAggregationNode final : public AggregationNode { +public: + DistinctAggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~DistinctAggregationNode() override = default; + Status _distinct_pre_agg_with_serialized_key(Block* in_block, Block* out_block); + void set_num_rows_returned(int64_t rows) { _num_rows_returned = rows; } + vectorized::VExprContextSPtrs get_conjuncts() { return _conjuncts; } + +private: + char* dummy_mapped_data = nullptr; + void _emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row, + ColumnRawPtrs& key_columns, const size_t num_rows); +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index d7ebfe0a51..f4da6d9aaf 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -17,6 +17,7 @@ #include "vec/exec/vaggregation_node.h" +#include <fmt/format.h> #include <gen_cpp/Exprs_types.h> #include <gen_cpp/Metrics_types.h> #include <gen_cpp/PlanNodes_types.h> @@ -25,6 +26,7 @@ #include <array> #include <atomic> #include <memory> +#include <string> #include "exec/exec_node.h" #include "runtime/block_spill_manager.h" @@ -102,27 +104,27 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), + _hash_table_compute_timer(nullptr), + _hash_table_input_counter(nullptr), + _build_timer(nullptr), + _expr_timer(nullptr), + _exec_timer(nullptr), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), _output_tuple_id(tnode.agg_node.output_tuple_id), _output_tuple_desc(nullptr), _needs_finalize(tnode.agg_node.need_finalize), _is_merge(false), - _build_timer(nullptr), _serialize_key_timer(nullptr), - _exec_timer(nullptr), _merge_timer(nullptr), - _expr_timer(nullptr), _get_results_timer(nullptr), _serialize_data_timer(nullptr), _serialize_result_timer(nullptr), _deserialize_data_timer(nullptr), - _hash_table_compute_timer(nullptr), _hash_table_iterate_timer(nullptr), _insert_keys_to_column_timer(nullptr), _streaming_agg_timer(nullptr), _hash_table_size_counter(nullptr), - _hash_table_input_counter(nullptr), _max_row_size_counter(nullptr) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; @@ -454,7 +456,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { } if (_is_streaming_preagg) { - runtime_profile()->append_exec_option("Streaming Preaggregation"); _executor.pre_agg = std::bind<Status>(&AggregationNode::_pre_agg_with_serialized_key, this, std::placeholders::_1, std::placeholders::_2); @@ -478,6 +479,14 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { _needs_finalize; // agg's finalize step } + fmt::memory_buffer msg; + fmt::format_to(msg, + "(_is_merge: {}, _needs_finalize: {}, Streaming Preaggregation: {}, agg size: " + "{}, limit: {})", + _is_merge ? "true" : "false", _needs_finalize ? "true" : "false", + _is_streaming_preagg ? "true" : "false", + std::to_string(_aggregate_evaluators.size()), std::to_string(_limit)); + runtime_profile()->add_info_string("AggInfos:", fmt::to_string(msg)); return Status::OK(); } @@ -918,7 +927,9 @@ void AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); if constexpr (HashTableTraits<HashTableType>::is_phmap) { - if (_hash_values.size() < num_rows) _hash_values.resize(num_rows); + if (_hash_values.size() < num_rows) { + _hash_values.resize(num_rows); + } if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< AggState>::value) { for (size_t i = 0; i < num_rows; ++i) { diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index e31240cdbc..7d560dae89 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -818,7 +818,7 @@ struct SpillPartitionHelper { }; // not support spill -class AggregationNode final : public ::doris::ExecNode { +class AggregationNode : public ::doris::ExecNode { public: using Sizes = std::vector<size_t>; @@ -836,18 +836,34 @@ public: Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block); bool is_streaming_preagg() const { return _is_streaming_preagg; } + bool is_aggregate_evaluators_empty() const { return _aggregate_evaluators.empty(); } + void _make_nullable_output_key(Block* block); -private: - friend class pipeline::AggSinkOperator; - friend class pipeline::StreamingAggSinkOperator; - friend class pipeline::AggSourceOperator; - friend class pipeline::StreamingAggSourceOperator; +protected: + bool _is_streaming_preagg; + bool _child_eos = false; + Block _preagg_block = Block(); + ArenaUPtr _agg_arena_pool; // group by k1,k2 VExprContextSPtrs _probe_expr_ctxs; + AggregatedDataVariantsUPtr _agg_data; + + std::vector<size_t> _probe_key_sz; + std::vector<size_t> _hash_values; // left / full join will change the key nullable make output/input solt // nullable diff. so we need make nullable of it. std::vector<size_t> _make_nullable_keys; - std::vector<size_t> _probe_key_sz; + RuntimeProfile::Counter* _hash_table_compute_timer; + RuntimeProfile::Counter* _hash_table_input_counter; + RuntimeProfile::Counter* _build_timer; + RuntimeProfile::Counter* _expr_timer; + RuntimeProfile::Counter* _exec_timer; + +private: + friend class pipeline::AggSinkOperator; + friend class pipeline::StreamingAggSinkOperator; + friend class pipeline::AggSourceOperator; + friend class pipeline::StreamingAggSourceOperator; std::vector<AggFnEvaluator*> _aggregate_evaluators; bool _can_short_circuit = false; @@ -873,47 +889,32 @@ private: size_t _external_agg_bytes_threshold; size_t _partitioned_threshold = 0; - AggregatedDataVariantsUPtr _agg_data; - AggSpillContext _spill_context; std::unique_ptr<SpillPartitionHelper> _spill_partition_helper; - ArenaUPtr _agg_arena_pool; - - RuntimeProfile::Counter* _build_timer; RuntimeProfile::Counter* _build_table_convert_timer; RuntimeProfile::Counter* _serialize_key_timer; - RuntimeProfile::Counter* _exec_timer; RuntimeProfile::Counter* _merge_timer; - RuntimeProfile::Counter* _expr_timer; RuntimeProfile::Counter* _get_results_timer; RuntimeProfile::Counter* _serialize_data_timer; RuntimeProfile::Counter* _serialize_result_timer; RuntimeProfile::Counter* _deserialize_data_timer; - RuntimeProfile::Counter* _hash_table_compute_timer; RuntimeProfile::Counter* _hash_table_iterate_timer; RuntimeProfile::Counter* _insert_keys_to_column_timer; RuntimeProfile::Counter* _streaming_agg_timer; RuntimeProfile::Counter* _hash_table_size_counter; - RuntimeProfile::Counter* _hash_table_input_counter; RuntimeProfile::Counter* _max_row_size_counter; - RuntimeProfile::Counter* _memory_usage_counter; RuntimeProfile::Counter* _hash_table_memory_usage; RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage; - bool _is_streaming_preagg; - Block _preagg_block = Block(); bool _should_expand_hash_table = true; - bool _child_eos = false; - bool _should_limit_output = false; bool _reach_limit = false; bool _agg_data_created_without_key = false; PODArray<AggregateDataPtr> _places; std::vector<char> _deserialize_buffer; - std::vector<size_t> _hash_values; std::vector<AggregateDataPtr> _values; std::unique_ptr<AggregateDataContainer> _aggregate_data_container; @@ -924,8 +925,6 @@ private: size_t _get_hash_table_size(); - void _make_nullable_output_key(Block* block); - Status _create_agg_status(AggregateDataPtr data); Status _destroy_agg_status(AggregateDataPtr data); @@ -956,6 +955,7 @@ private: void _close_with_serialized_key(); void _init_hash_method(const VExprContextSPtrs& probe_exprs); +protected: template <typename AggState, typename AggMethod> void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method, const ColumnRawPtrs& key_columns, const size_t num_rows) { @@ -970,6 +970,7 @@ private: } } +private: template <bool limit> Status _execute_with_serialized_key_helper(Block* block) { SCOPED_TIMER(_build_timer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java index c37d66471d..d19f63f658 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java @@ -31,7 +31,6 @@ import org.apache.doris.nereids.types.DateType; import org.apache.doris.nereids.types.DateV2Type; import org.apache.doris.nereids.types.IntegerType; import org.apache.doris.nereids.types.StringType; -import org.apache.doris.nereids.types.VarcharType; import org.apache.doris.nereids.util.ExpressionUtils; import com.google.common.base.Preconditions; @@ -47,12 +46,7 @@ public class WindowFunnel extends AggregateFunction public static final List<FunctionSignature> SIGNATURES = ImmutableList.of( FunctionSignature.ret(IntegerType.INSTANCE) - .varArgs(BigIntType.INSTANCE, StringType.INSTANCE, DateTimeType.INSTANCE, BooleanType.INSTANCE), - FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT) - .args(BigIntType.INSTANCE, - StringType.INSTANCE, - DateTimeV2Type.SYSTEM_DEFAULT, - BooleanType.INSTANCE) + .varArgs(BigIntType.INSTANCE, StringType.INSTANCE, DateTimeType.INSTANCE, BooleanType.INSTANCE) ); /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org