This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c6246f7ee [improve](agg) support distinct agg node (#22169)
1c6246f7ee is described below

commit 1c6246f7ee922c884d32d5908664b8bb9a0e029e
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Fri Jul 28 13:54:10 2023 +0800

    [improve](agg) support distinct agg node (#22169)
    
    select c_name from customer union select c_name from customer
    this sql used agg node to get distinct row of c_name,
    so it's no need to wait for inserted all data to hash map,
    could output the data which it's inserted into hash map successed.
---
 be/src/exec/exec_node.cpp                          |   7 +-
 ...istinct_streaming_aggregation_sink_operator.cpp |  97 +++++++++++++++
 .../distinct_streaming_aggregation_sink_operator.h |  76 ++++++++++++
 ...tinct_streaming_aggregation_source_operator.cpp |  92 ++++++++++++++
 ...istinct_streaming_aggregation_source_operator.h |  67 ++++++++++
 be/src/pipeline/pipeline_fragment_context.cpp      |  17 ++-
 be/src/vec/exec/distinct_vaggregation_node.cpp     | 136 +++++++++++++++++++++
 be/src/vec/exec/distinct_vaggregation_node.h       |  55 +++++++++
 be/src/vec/exec/vaggregation_node.cpp              |  25 ++--
 be/src/vec/exec/vaggregation_node.h                |  49 ++++----
 .../expressions/functions/agg/WindowFunnel.java    |   8 +-
 11 files changed, 588 insertions(+), 41 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index c6b7826deb..bcb0771eda 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -44,6 +44,7 @@
 #include "util/uid_util.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/block.h"
+#include "vec/exec/distinct_vaggregation_node.h"
 #include "vec/exec/join/vhash_join_node.h"
 #include "vec/exec/join/vnested_loop_join_node.h"
 #include "vec/exec/scan/new_es_scan_node.h"
@@ -371,7 +372,11 @@ Status ExecNode::create_node(RuntimeState* state, 
ObjectPool* pool, const TPlanN
         return Status::OK();
 
     case TPlanNodeType::AGGREGATION_NODE:
-        *node = pool->add(new vectorized::AggregationNode(pool, tnode, descs));
+        if (tnode.agg_node.aggregate_functions.empty() && 
state->enable_pipeline_exec()) {
+            *node = pool->add(new vectorized::DistinctAggregationNode(pool, 
tnode, descs));
+        } else {
+            *node = pool->add(new vectorized::AggregationNode(pool, tnode, 
descs));
+        }
         return Status::OK();
 
     case TPlanNodeType::HASH_JOIN_NODE:
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
new file mode 100644
index 0000000000..48695ed56f
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_sink_operator.h"
+
+#include <gen_cpp/Metrics_types.h>
+
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/operator.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::pipeline {
+
+DistinctStreamingAggSinkOperator::DistinctStreamingAggSinkOperator(
+        OperatorBuilderBase* operator_builder, ExecNode* agg_node, 
std::shared_ptr<DataQueue> queue)
+        : StreamingOperator(operator_builder, agg_node), 
_data_queue(std::move(queue)) {}
+
+bool DistinctStreamingAggSinkOperator::can_write() {
+    // sink and source in diff threads
+    return _data_queue->has_enough_space_to_push();
+}
+
+Status DistinctStreamingAggSinkOperator::sink(RuntimeState* state, 
vectorized::Block* in_block,
+                                              SourceState source_state) {
+    if (in_block && in_block->rows() > 0) {
+        if (_output_block == nullptr) {
+            _output_block = _data_queue->get_free_block();
+        }
+        RETURN_IF_ERROR(
+                _node->_distinct_pre_agg_with_serialized_key(in_block, 
_output_block.get()));
+
+        // get enough data or reached limit rows, need push block to queue
+        if (_node->limit() != -1 &&
+            (_output_block->rows() + _output_distinct_rows) >= _node->limit()) 
{
+            auto limit_rows = _node->limit() - _output_block->rows();
+            _output_block->set_num_rows(limit_rows);
+            _output_distinct_rows += limit_rows;
+            _data_queue->push_block(std::move(_output_block));
+        } else if (_output_block->rows() >= state->batch_size()) {
+            _output_distinct_rows += _output_block->rows();
+            _data_queue->push_block(std::move(_output_block));
+        }
+    }
+
+    // reach limit or source finish
+    if ((UNLIKELY(source_state == SourceState::FINISHED)) || 
reached_limited_rows()) {
+        if (_output_block != nullptr) { //maybe the last block with eos
+            _output_distinct_rows += _output_block->rows();
+            _data_queue->push_block(std::move(_output_block));
+        }
+        _data_queue->set_finish();
+        return Status::Error<ErrorCode::END_OF_FILE>("");
+    }
+    return Status::OK();
+}
+
+Status DistinctStreamingAggSinkOperator::close(RuntimeState* state) {
+    if (_data_queue && !_data_queue->is_finish()) {
+        // finish should be set, if not set here means error.
+        _data_queue->set_canceled();
+    }
+    return StreamingOperator::close(state);
+}
+
+DistinctStreamingAggSinkOperatorBuilder::DistinctStreamingAggSinkOperatorBuilder(
+        int32_t id, ExecNode* exec_node, std::shared_ptr<DataQueue> queue)
+        : OperatorBuilder(id, "DistinctStreamingAggSinkOperator", exec_node),
+          _data_queue(std::move(queue)) {}
+
+OperatorPtr DistinctStreamingAggSinkOperatorBuilder::build_operator() {
+    return std::make_shared<DistinctStreamingAggSinkOperator>(this, _node, 
_data_queue);
+}
+
+} // namespace doris::pipeline
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
new file mode 100644
index 0000000000..ae7106178e
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class DistinctStreamingAggSinkOperatorBuilder final
+        : public OperatorBuilder<vectorized::DistinctAggregationNode> {
+public:
+    DistinctStreamingAggSinkOperatorBuilder(int32_t, ExecNode*, 
std::shared_ptr<DataQueue>);
+
+    OperatorPtr build_operator() override;
+
+    bool is_sink() const override { return true; }
+    bool is_source() const override { return false; }
+
+private:
+    std::shared_ptr<DataQueue> _data_queue;
+};
+
+class DistinctStreamingAggSinkOperator final
+        : public StreamingOperator<DistinctStreamingAggSinkOperatorBuilder> {
+public:
+    DistinctStreamingAggSinkOperator(OperatorBuilderBase* operator_builder, 
ExecNode*,
+                                     std::shared_ptr<DataQueue>);
+
+    Status sink(RuntimeState* state, vectorized::Block* block, SourceState 
source_state) override;
+
+    bool can_write() override;
+
+    Status close(RuntimeState* state) override;
+
+    bool reached_limited_rows() {
+        return _node->limit() != -1 && _output_distinct_rows > _node->limit();
+    }
+
+private:
+    int64_t _output_distinct_rows = 0;
+    std::shared_ptr<DataQueue> _data_queue;
+    std::unique_ptr<vectorized::Block> _output_block = 
vectorized::Block::create_unique();
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
new file mode 100644
index 0000000000..f91fd3fbe3
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "distinct_streaming_aggregation_source_operator.h"
+
+#include <utility>
+
+#include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/operator.h"
+#include "runtime/descriptors.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace pipeline {
+DistinctStreamingAggSourceOperator::DistinctStreamingAggSourceOperator(
+        OperatorBuilderBase* templ, ExecNode* node, std::shared_ptr<DataQueue> 
queue)
+        : SourceOperator(templ, node), _data_queue(std::move(queue)) {}
+
+bool DistinctStreamingAggSourceOperator::can_read() {
+    return _data_queue->has_data_or_finished();
+}
+
+Status DistinctStreamingAggSourceOperator::pull_data(RuntimeState* state, 
vectorized::Block* block,
+                                                     bool* eos) {
+    std::unique_ptr<vectorized::Block> agg_block;
+    RETURN_IF_ERROR(_data_queue->get_block_from_queue(&agg_block));
+    if (agg_block != nullptr) {
+        block->swap(*agg_block);
+        
agg_block->clear_column_data(_node->row_desc().num_materialized_slots());
+        _data_queue->push_free_block(std::move(agg_block));
+    }
+    if (_data_queue->data_exhausted()) { //the sink is eos or reached limit
+        *eos = true;
+    }
+    _node->_make_nullable_output_key(block);
+    if (_node->is_streaming_preagg() == false) {
+        // dispose the having clause, should not be execute in prestreaming agg
+        
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_node->get_conjuncts(), 
block,
+                                                               
block->columns()));
+    }
+
+    rows_have_returned += block->rows();
+    return Status::OK();
+}
+
+Status DistinctStreamingAggSourceOperator::get_block(RuntimeState* state, 
vectorized::Block* block,
+                                                     SourceState& 
source_state) {
+    bool eos = false;
+    RETURN_IF_ERROR(_node->get_next_after_projects(
+            state, block, &eos,
+            std::bind(&DistinctStreamingAggSourceOperator::pull_data, this, 
std::placeholders::_1,
+                      std::placeholders::_2, std::placeholders::_3)));
+    if (UNLIKELY(eos)) {
+        _node->set_num_rows_returned(rows_have_returned);
+        source_state = SourceState::FINISHED;
+    } else {
+        source_state = SourceState::DEPEND_ON_SOURCE;
+    }
+    return Status::OK();
+}
+
+DistinctStreamingAggSourceOperatorBuilder::DistinctStreamingAggSourceOperatorBuilder(
+        int32_t id, ExecNode* exec_node, std::shared_ptr<DataQueue> queue)
+        : OperatorBuilder(id, "DistinctStreamingAggSourceOperator", exec_node),
+          _data_queue(std::move(queue)) {}
+
+OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() {
+    return std::make_shared<DistinctStreamingAggSourceOperator>(this, _node, 
_data_queue);
+}
+
+} // namespace pipeline
+} // namespace doris
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
new file mode 100644
index 0000000000..3534193bf8
--- /dev/null
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <stdint.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/exec/distinct_vaggregation_node.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ExecNode;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+namespace pipeline {
+class DataQueue;
+
+class DistinctStreamingAggSourceOperatorBuilder final
+        : public OperatorBuilder<vectorized::DistinctAggregationNode> {
+public:
+    DistinctStreamingAggSourceOperatorBuilder(int32_t, ExecNode*, 
std::shared_ptr<DataQueue>);
+
+    bool is_source() const override { return true; }
+
+    OperatorPtr build_operator() override;
+
+private:
+    std::shared_ptr<DataQueue> _data_queue;
+};
+
+class DistinctStreamingAggSourceOperator final
+        : public SourceOperator<DistinctStreamingAggSourceOperatorBuilder> {
+public:
+    DistinctStreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, 
std::shared_ptr<DataQueue>);
+    bool can_read() override;
+    Status get_block(RuntimeState*, vectorized::Block*, SourceState& 
source_state) override;
+    Status open(RuntimeState*) override { return Status::OK(); }
+    Status pull_data(RuntimeState* state, vectorized::Block* output_block, 
bool* eos);
+
+private:
+    int64_t rows_have_returned = 0;
+    std::shared_ptr<DataQueue> _data_queue;
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 3c4957ad40..fe5e98dde8 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -48,6 +48,8 @@
 #include "pipeline/exec/const_value_operator.h"
 #include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/datagen_operator.h"
+#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
+#include "pipeline/exec/distinct_streaming_aggregation_source_operator.h"
 #include "pipeline/exec/empty_set_operator.h"
 #include "pipeline/exec/empty_source_operator.h"
 #include "pipeline/exec/exchange_sink_operator.h"
@@ -503,10 +505,21 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         break;
     }
     case TPlanNodeType::AGGREGATION_NODE: {
-        auto* agg_node = assert_cast<vectorized::AggregationNode*>(node);
+        auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
         auto new_pipe = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
-        if (agg_node->is_streaming_preagg()) {
+        if (agg_node->is_aggregate_evaluators_empty()) {
+            auto data_queue = std::make_shared<DataQueue>(1);
+            OperatorBuilderPtr pre_agg_sink =
+                    
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
+                                                                              
data_queue);
+            RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
+
+            OperatorBuilderPtr pre_agg_source =
+                    
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
+                            node->id(), agg_node, data_queue);
+            RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
+        } else if (agg_node->is_streaming_preagg()) {
             auto data_queue = std::make_shared<DataQueue>(1);
             OperatorBuilderPtr pre_agg_sink = 
std::make_shared<StreamingAggSinkOperatorBuilder>(
                     node->id(), agg_node, data_queue);
diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp 
b/be/src/vec/exec/distinct_vaggregation_node.cpp
new file mode 100644
index 0000000000..bbbd196411
--- /dev/null
+++ b/be/src/vec/exec/distinct_vaggregation_node.cpp
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/distinct_vaggregation_node.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/aggregate_functions/aggregate_function_uniq.h"
+#include "vec/exec/vaggregation_node.h"
+
+namespace doris {
+class ObjectPool;
+} // namespace doris
+
+namespace doris::vectorized {
+
+DistinctAggregationNode::DistinctAggregationNode(ObjectPool* pool, const 
TPlanNode& tnode,
+                                                 const DescriptorTbl& descs)
+        : AggregationNode(pool, tnode, descs) {
+    dummy_mapped_data = pool->add(new char('A'));
+}
+
+Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
+        doris::vectorized::Block* in_block, doris::vectorized::Block* 
out_block) {
+    SCOPED_TIMER(_build_timer);
+    DCHECK(!_probe_expr_ctxs.empty());
+
+    size_t key_size = _probe_expr_ctxs.size();
+    ColumnRawPtrs key_columns(key_size);
+    {
+        SCOPED_TIMER(_expr_timer);
+        for (size_t i = 0; i < key_size; ++i) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(in_block, 
&result_column_id));
+            in_block->get_by_position(result_column_id).column =
+                    in_block->get_by_position(result_column_id)
+                            .column->convert_to_full_column_if_const();
+            key_columns[i] = 
in_block->get_by_position(result_column_id).column.get();
+        }
+    }
+
+    int rows = in_block->rows();
+    IColumn::Selector distinct_row;
+    distinct_row.reserve(rows);
+
+    RETURN_IF_CATCH_EXCEPTION(
+            _emplace_into_hash_table_to_distinct(distinct_row, key_columns, 
rows));
+
+    bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse();
+    if (mem_reuse) {
+        for (int i = 0; i < key_size; ++i) {
+            auto dst = out_block->get_by_position(i).column->assume_mutable();
+            key_columns[i]->append_data_by_selector(dst, distinct_row);
+        }
+    } else {
+        ColumnsWithTypeAndName columns_with_schema;
+        for (int i = 0; i < key_size; ++i) {
+            auto distinct_column = key_columns[i]->clone_empty();
+            key_columns[i]->append_data_by_selector(distinct_column, 
distinct_row);
+            columns_with_schema.emplace_back(std::move(distinct_column),
+                                             
_probe_expr_ctxs[i]->root()->data_type(),
+                                             
_probe_expr_ctxs[i]->root()->expr_name());
+        }
+        out_block->swap(Block(columns_with_schema));
+    }
+    return Status::OK();
+}
+
+void 
DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Selector&
 distinct_row,
+                                                                   
ColumnRawPtrs& key_columns,
+                                                                   const 
size_t num_rows) {
+    std::visit(
+            [&](auto&& agg_method) -> void {
+                SCOPED_TIMER(_hash_table_compute_timer);
+                using HashMethodType = std::decay_t<decltype(agg_method)>;
+                using HashTableType = std::decay_t<decltype(agg_method.data)>;
+                using AggState = typename HashMethodType::State;
+                AggState state(key_columns, _probe_key_sz, nullptr);
+                _pre_serialize_key_if_need(state, agg_method, key_columns, 
num_rows);
+
+                if constexpr (HashTableTraits<HashTableType>::is_phmap) {
+                    if (_hash_values.size() < num_rows) {
+                        _hash_values.resize(num_rows);
+                    }
+                    if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+                                          AggState>::value) {
+                        for (size_t i = 0; i < num_rows; ++i) {
+                            _hash_values[i] = 
agg_method.data.hash(agg_method.keys[i]);
+                        }
+                    } else {
+                        for (size_t i = 0; i < num_rows; ++i) {
+                            _hash_values[i] =
+                                    
agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool));
+                        }
+                    }
+                }
+
+                /// For all rows.
+                COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+                for (size_t i = 0; i < num_rows; ++i) {
+                    auto emplace_result = [&]() {
+                        if constexpr 
(HashTableTraits<HashTableType>::is_phmap) {
+                            if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) 
{
+                                agg_method.data.prefetch_by_hash(
+                                        _hash_values[i + 
HASH_MAP_PREFETCH_DIST]);
+                            }
+                            return state.emplace_key(agg_method.data, 
_hash_values[i], i,
+                                                     *_agg_arena_pool);
+                        } else {
+                            return state.emplace_key(agg_method.data, i, 
*_agg_arena_pool);
+                        }
+                    }();
+
+                    if (emplace_result.is_inserted()) {
+                        emplace_result.set_mapped(dummy_mapped_data);
+                        distinct_row.push_back(i);
+                    }
+                }
+            },
+            _agg_data->_aggregated_method_variant);
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/distinct_vaggregation_node.h 
b/be/src/vec/exec/distinct_vaggregation_node.h
new file mode 100644
index 0000000000..f5ca0ceebb
--- /dev/null
+++ b/be/src/vec/exec/distinct_vaggregation_node.h
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <cstdint>
+#include <memory>
+
+#include "vec/exec/vaggregation_node.h"
+#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris {
+class TPlanNode;
+class DescriptorTbl;
+class ObjectPool;
+class RuntimeState;
+
+namespace vectorized {
+
+// select c_name from customer union select c_name from customer
+// this sql used agg node to get distinct row of c_name,
+// so it's could output data when it's inserted into hashmap.
+// phase1: (_is_merge:false, _needs_finalize:false, Streaming 
Preaggregation:true, agg size:0, limit:-1)
+// phase2: (_is_merge:false, _needs_finalize:true,  Streaming 
Preaggregation:false,agg size:0, limit:-1)
+class DistinctAggregationNode final : public AggregationNode {
+public:
+    DistinctAggregationNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+    ~DistinctAggregationNode() override = default;
+    Status _distinct_pre_agg_with_serialized_key(Block* in_block, Block* 
out_block);
+    void set_num_rows_returned(int64_t rows) { _num_rows_returned = rows; }
+    vectorized::VExprContextSPtrs get_conjuncts() { return _conjuncts; }
+
+private:
+    char* dummy_mapped_data = nullptr;
+    void _emplace_into_hash_table_to_distinct(IColumn::Selector& distinct_row,
+                                              ColumnRawPtrs& key_columns, 
const size_t num_rows);
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index d7ebfe0a51..f4da6d9aaf 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/exec/vaggregation_node.h"
 
+#include <fmt/format.h>
 #include <gen_cpp/Exprs_types.h>
 #include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/PlanNodes_types.h>
@@ -25,6 +26,7 @@
 #include <array>
 #include <atomic>
 #include <memory>
+#include <string>
 
 #include "exec/exec_node.h"
 #include "runtime/block_spill_manager.h"
@@ -102,27 +104,27 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
 AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
                                  const DescriptorTbl& descs)
         : ExecNode(pool, tnode, descs),
+          _hash_table_compute_timer(nullptr),
+          _hash_table_input_counter(nullptr),
+          _build_timer(nullptr),
+          _expr_timer(nullptr),
+          _exec_timer(nullptr),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _intermediate_tuple_desc(nullptr),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
           _output_tuple_desc(nullptr),
           _needs_finalize(tnode.agg_node.need_finalize),
           _is_merge(false),
-          _build_timer(nullptr),
           _serialize_key_timer(nullptr),
-          _exec_timer(nullptr),
           _merge_timer(nullptr),
-          _expr_timer(nullptr),
           _get_results_timer(nullptr),
           _serialize_data_timer(nullptr),
           _serialize_result_timer(nullptr),
           _deserialize_data_timer(nullptr),
-          _hash_table_compute_timer(nullptr),
           _hash_table_iterate_timer(nullptr),
           _insert_keys_to_column_timer(nullptr),
           _streaming_agg_timer(nullptr),
           _hash_table_size_counter(nullptr),
-          _hash_table_input_counter(nullptr),
           _max_row_size_counter(nullptr) {
     if (tnode.agg_node.__isset.use_streaming_preaggregation) {
         _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
@@ -454,7 +456,6 @@ Status AggregationNode::prepare_profile(RuntimeState* 
state) {
         }
 
         if (_is_streaming_preagg) {
-            runtime_profile()->append_exec_option("Streaming Preaggregation");
             _executor.pre_agg =
                     
std::bind<Status>(&AggregationNode::_pre_agg_with_serialized_key, this,
                                       std::placeholders::_1, 
std::placeholders::_2);
@@ -478,6 +479,14 @@ Status AggregationNode::prepare_profile(RuntimeState* 
state) {
                                _needs_finalize;      // agg's finalize step
     }
 
+    fmt::memory_buffer msg;
+    fmt::format_to(msg,
+                   "(_is_merge: {}, _needs_finalize: {}, Streaming 
Preaggregation: {}, agg size: "
+                   "{}, limit: {})",
+                   _is_merge ? "true" : "false", _needs_finalize ? "true" : 
"false",
+                   _is_streaming_preagg ? "true" : "false",
+                   std::to_string(_aggregate_evaluators.size()), 
std::to_string(_limit));
+    runtime_profile()->add_info_string("AggInfos:", fmt::to_string(msg));
     return Status::OK();
 }
 
@@ -918,7 +927,9 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                 _pre_serialize_key_if_need(state, agg_method, key_columns, 
num_rows);
 
                 if constexpr (HashTableTraits<HashTableType>::is_phmap) {
-                    if (_hash_values.size() < num_rows) 
_hash_values.resize(num_rows);
+                    if (_hash_values.size() < num_rows) {
+                        _hash_values.resize(num_rows);
+                    }
                     if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
                                           AggState>::value) {
                         for (size_t i = 0; i < num_rows; ++i) {
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index e31240cdbc..7d560dae89 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -818,7 +818,7 @@ struct SpillPartitionHelper {
 };
 
 // not support spill
-class AggregationNode final : public ::doris::ExecNode {
+class AggregationNode : public ::doris::ExecNode {
 public:
     using Sizes = std::vector<size_t>;
 
@@ -836,18 +836,34 @@ public:
     Status sink(doris::RuntimeState* state, vectorized::Block* input_block, 
bool eos) override;
     Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* 
output_block);
     bool is_streaming_preagg() const { return _is_streaming_preagg; }
+    bool is_aggregate_evaluators_empty() const { return 
_aggregate_evaluators.empty(); }
+    void _make_nullable_output_key(Block* block);
 
-private:
-    friend class pipeline::AggSinkOperator;
-    friend class pipeline::StreamingAggSinkOperator;
-    friend class pipeline::AggSourceOperator;
-    friend class pipeline::StreamingAggSourceOperator;
+protected:
+    bool _is_streaming_preagg;
+    bool _child_eos = false;
+    Block _preagg_block = Block();
+    ArenaUPtr _agg_arena_pool;
     // group by k1,k2
     VExprContextSPtrs _probe_expr_ctxs;
+    AggregatedDataVariantsUPtr _agg_data;
+
+    std::vector<size_t> _probe_key_sz;
+    std::vector<size_t> _hash_values;
     // left / full join will change the key nullable make output/input solt
     // nullable diff. so we need make nullable of it.
     std::vector<size_t> _make_nullable_keys;
-    std::vector<size_t> _probe_key_sz;
+    RuntimeProfile::Counter* _hash_table_compute_timer;
+    RuntimeProfile::Counter* _hash_table_input_counter;
+    RuntimeProfile::Counter* _build_timer;
+    RuntimeProfile::Counter* _expr_timer;
+    RuntimeProfile::Counter* _exec_timer;
+
+private:
+    friend class pipeline::AggSinkOperator;
+    friend class pipeline::StreamingAggSinkOperator;
+    friend class pipeline::AggSourceOperator;
+    friend class pipeline::StreamingAggSourceOperator;
 
     std::vector<AggFnEvaluator*> _aggregate_evaluators;
     bool _can_short_circuit = false;
@@ -873,47 +889,32 @@ private:
     size_t _external_agg_bytes_threshold;
     size_t _partitioned_threshold = 0;
 
-    AggregatedDataVariantsUPtr _agg_data;
-
     AggSpillContext _spill_context;
     std::unique_ptr<SpillPartitionHelper> _spill_partition_helper;
 
-    ArenaUPtr _agg_arena_pool;
-
-    RuntimeProfile::Counter* _build_timer;
     RuntimeProfile::Counter* _build_table_convert_timer;
     RuntimeProfile::Counter* _serialize_key_timer;
-    RuntimeProfile::Counter* _exec_timer;
     RuntimeProfile::Counter* _merge_timer;
-    RuntimeProfile::Counter* _expr_timer;
     RuntimeProfile::Counter* _get_results_timer;
     RuntimeProfile::Counter* _serialize_data_timer;
     RuntimeProfile::Counter* _serialize_result_timer;
     RuntimeProfile::Counter* _deserialize_data_timer;
-    RuntimeProfile::Counter* _hash_table_compute_timer;
     RuntimeProfile::Counter* _hash_table_iterate_timer;
     RuntimeProfile::Counter* _insert_keys_to_column_timer;
     RuntimeProfile::Counter* _streaming_agg_timer;
     RuntimeProfile::Counter* _hash_table_size_counter;
-    RuntimeProfile::Counter* _hash_table_input_counter;
     RuntimeProfile::Counter* _max_row_size_counter;
-
     RuntimeProfile::Counter* _memory_usage_counter;
     RuntimeProfile::Counter* _hash_table_memory_usage;
     RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage;
 
-    bool _is_streaming_preagg;
-    Block _preagg_block = Block();
     bool _should_expand_hash_table = true;
-    bool _child_eos = false;
-
     bool _should_limit_output = false;
     bool _reach_limit = false;
     bool _agg_data_created_without_key = false;
 
     PODArray<AggregateDataPtr> _places;
     std::vector<char> _deserialize_buffer;
-    std::vector<size_t> _hash_values;
     std::vector<AggregateDataPtr> _values;
     std::unique_ptr<AggregateDataContainer> _aggregate_data_container;
 
@@ -924,8 +925,6 @@ private:
 
     size_t _get_hash_table_size();
 
-    void _make_nullable_output_key(Block* block);
-
     Status _create_agg_status(AggregateDataPtr data);
     Status _destroy_agg_status(AggregateDataPtr data);
 
@@ -956,6 +955,7 @@ private:
     void _close_with_serialized_key();
     void _init_hash_method(const VExprContextSPtrs& probe_exprs);
 
+protected:
     template <typename AggState, typename AggMethod>
     void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
                                     const ColumnRawPtrs& key_columns, const 
size_t num_rows) {
@@ -970,6 +970,7 @@ private:
         }
     }
 
+private:
     template <bool limit>
     Status _execute_with_serialized_key_helper(Block* block) {
         SCOPED_TIMER(_build_timer);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
index c37d66471d..d19f63f658 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/WindowFunnel.java
@@ -31,7 +31,6 @@ import org.apache.doris.nereids.types.DateType;
 import org.apache.doris.nereids.types.DateV2Type;
 import org.apache.doris.nereids.types.IntegerType;
 import org.apache.doris.nereids.types.StringType;
-import org.apache.doris.nereids.types.VarcharType;
 import org.apache.doris.nereids.util.ExpressionUtils;
 
 import com.google.common.base.Preconditions;
@@ -47,12 +46,7 @@ public class WindowFunnel extends AggregateFunction
 
     public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
             FunctionSignature.ret(IntegerType.INSTANCE)
-                    .varArgs(BigIntType.INSTANCE, StringType.INSTANCE, 
DateTimeType.INSTANCE, BooleanType.INSTANCE),
-            FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT)
-                    .args(BigIntType.INSTANCE,
-                            StringType.INSTANCE,
-                            DateTimeV2Type.SYSTEM_DEFAULT,
-                            BooleanType.INSTANCE)
+                    .varArgs(BigIntType.INSTANCE, StringType.INSTANCE, 
DateTimeType.INSTANCE, BooleanType.INSTANCE)
     );
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to