This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9701457c364 [refine](SetOperator) refine some SetOperator code. 
(#49772)
9701457c364 is described below

commit 9701457c364c3f2bec6e2d2adc2da4676110350f
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Thu Apr 17 11:26:18 2025 +0800

    [refine](SetOperator) refine some SetOperator code. (#49772)
    
    ### What problem does this PR solve?
    
    Modified some parts of the SetOperator code.
    
    1. expr should use local state.
    2. Abstracted out the get_hash_table_size function.
    3. Removed some unreachable code.
    4. For the output of source, replace it with append_data_by_selector to
    optimize speed.
---
 be/src/pipeline/dependency.cpp                   | 13 ++++++++
 be/src/pipeline/dependency.h                     |  1 +
 be/src/pipeline/exec/set_probe_sink_operator.cpp | 42 ++++++++++--------------
 be/src/pipeline/exec/set_sink_operator.cpp       | 30 +++++------------
 be/src/pipeline/exec/set_source_operator.cpp     | 35 +++++++++++---------
 be/src/pipeline/exec/set_source_operator.h       |  5 ++-
 6 files changed, 62 insertions(+), 64 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index c8a9f5ed528..5624959f630 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -432,6 +432,19 @@ Status SetSharedState::update_build_not_ignore_null(const 
vectorized::VExprConte
     return Status::OK();
 }
 
+size_t SetSharedState::get_hash_table_size() const {
+    size_t hash_table_size = 0;
+    std::visit(
+            [&](auto&& arg) {
+                using HashTableCtxType = std::decay_t<decltype(arg)>;
+                if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                    hash_table_size = arg.hash_table->size();
+                }
+            },
+            hash_table_variants->method_variant);
+    return hash_table_size;
+}
+
 Status SetSharedState::hash_table_init() {
     std::vector<vectorized::DataTypePtr> data_types;
     for (size_t i = 0; i != child_exprs_lists[0].size(); ++i) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index cbb2e043a2a..ef8ed63eb16 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -676,6 +676,7 @@ public:
     // If a calculation involves both nullable and non-nullable columns, the 
final output should be a nullable column
     Status update_build_not_ignore_null(const vectorized::VExprContextSPtrs& 
ctxs);
 
+    size_t get_hash_table_size() const;
     /// init in both upstream side.
     //The i-th result expr list refers to the i-th child.
     std::vector<vectorized::VExprContextSPtrs> child_exprs_lists;
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 5abf8b36ab0..c2c9e6741e8 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -144,22 +144,25 @@ Status 
SetProbeSinkOperatorX<is_intersect>::_extract_probe_column(
         vectorized::ColumnRawPtrs& raw_ptrs, int child_id) {
     auto& build_not_ignore_null = 
local_state._shared_state->build_not_ignore_null;
 
-    for (size_t i = 0; i < _child_exprs.size(); ++i) {
+    auto& child_exprs = local_state._child_exprs;
+    for (size_t i = 0; i < child_exprs.size(); ++i) {
         int result_col_id = -1;
-        RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_col_id));
+        RETURN_IF_ERROR(child_exprs[i]->execute(&block, &result_col_id));
 
         block.get_by_position(result_col_id).column =
                 
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
-        auto column = block.get_by_position(result_col_id).column.get();
-
-        if (auto* nullable = 
check_and_get_column<vectorized::ColumnNullable>(*column)) {
-            auto& col_nested = nullable->get_nested_column();
-            if (build_not_ignore_null[i]) { //same as build column
-                raw_ptrs[i] = nullable;
-            } else {
-                raw_ptrs[i] = &col_nested;
+        const auto* column = block.get_by_position(result_col_id).column.get();
+
+        if (const auto* nullable = 
check_and_get_column<vectorized::ColumnNullable>(*column)) {
+            if (!build_not_ignore_null[i]) {
+                return Status::InternalError(
+                        "SET operator expects a nullable : {} column in column 
{}, but the "
+                        "computed "
+                        "output is a nullable : {} column",
+                        build_not_ignore_null[i], i,
+                        nullable->get_nested_column_ptr()->is_nullable());
             }
-
+            raw_ptrs[i] = nullable;
         } else {
             if (build_not_ignore_null[i]) {
                 auto column_ptr = 
make_nullable(block.get_by_position(result_col_id).column, false);
@@ -179,22 +182,10 @@ template <bool is_intersect>
 void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
         SetProbeSinkLocalState<is_intersect>& local_state) {
     auto& valid_element_in_hash_tbl = 
local_state._shared_state->valid_element_in_hash_tbl;
-    auto& hash_table_variants = local_state._shared_state->hash_table_variants;
-
     if (_cur_child_id != (local_state._shared_state->child_quantity - 1)) {
         _refresh_hash_table(local_state);
-        if constexpr (is_intersect) {
-            valid_element_in_hash_tbl = 0;
-        } else {
-            std::visit(
-                    [&](auto&& arg) {
-                        using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                            valid_element_in_hash_tbl = arg.hash_table->size();
-                        }
-                    },
-                    hash_table_variants->method_variant);
-        }
+        uint64_t hash_table_size = 
local_state._shared_state->get_hash_table_size();
+        valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
         local_state._probe_columns.resize(
                 local_state._shared_state->child_exprs_lists[_cur_child_id + 
1].size());
         
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
@@ -256,6 +247,7 @@ void 
SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table(
                         }
                         arg.hash_table = std::move(tmp_hash_table);
                     } else if (is_intersect) {
+                        DCHECK_EQ(valid_element_in_hash_tbl, 
arg.hash_table->size());
                         while (iter != iter_end) {
                             auto& mapped = iter->get_second();
                             auto* it = &mapped;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 6c5b4483915..41fd67aabf8 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -26,19 +26,6 @@
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
 
-uint64_t get_hash_table_size(const auto& hash_table_variant) {
-    uint64_t hash_table_size = 0;
-    std::visit(
-            [&](auto&& arg) {
-                using HashTableCtxType = std::decay_t<decltype(arg)>;
-                if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                    hash_table_size = arg.hash_table->size();
-                }
-            },
-            hash_table_variant);
-    return hash_table_size;
-}
-
 template <bool is_intersect>
 Status SetSinkLocalState<is_intersect>::terminate(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
@@ -58,8 +45,7 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState* 
state, Status exec_s
     if (!_terminated && _runtime_filter_producer_helper && 
!state->is_cancelled()) {
         try {
             RETURN_IF_ERROR(_runtime_filter_producer_helper->process(
-                    state, &_shared_state->build_block,
-                    
get_hash_table_size(_shared_state->hash_table_variants->method_variant)));
+                    state, &_shared_state->build_block, 
_shared_state->get_hash_table_size()));
         } catch (Exception& e) {
             return Status::InternalError(
                     "rf process meet error: {}, _terminated: {}, 
_finish_dependency: {}",
@@ -105,12 +91,12 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
         local_state._mutable_block.clear();
 
         if (eos) {
-            uint64_t hash_table_size = get_hash_table_size(
-                    
local_state._shared_state->hash_table_variants->method_variant);
+            uint64_t hash_table_size = 
local_state._shared_state->get_hash_table_size();
             valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
 
             
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
                     ->set_ready();
+            DCHECK_GT(_child_quantity, 1);
             
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
                     state, hash_table_size, local_state._finish_dependency));
         }
@@ -152,16 +138,18 @@ template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::_extract_build_column(
         SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
         vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) {
-    std::vector<int> result_locs(local_state._child_exprs.size(), -1);
+    // use local state child exprs
+    auto& child_expr = local_state._child_exprs;
+    std::vector<int> result_locs(child_expr.size(), -1);
     bool is_all_const = true;
 
-    for (size_t i = 0; i < local_state._child_exprs.size(); ++i) {
-        RETURN_IF_ERROR(local_state._child_exprs[i]->execute(&block, 
&result_locs[i]));
+    for (size_t i = 0; i < child_expr.size(); ++i) {
+        RETURN_IF_ERROR(child_expr[i]->execute(&block, &result_locs[i]));
         is_all_const &= 
is_column_const(*block.get_by_position(result_locs[i]).column);
     }
     rows = is_all_const ? 1 : rows;
 
-    for (size_t i = 0; i < local_state._child_exprs.size(); ++i) {
+    for (size_t i = 0; i < child_expr.size(); ++i) {
         size_t result_col_id = result_locs[i];
 
         if (is_all_const) {
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index bc2dc32d577..6d464513b1f 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -127,34 +127,42 @@ Status 
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
         vectorized::Block* output_block, const int batch_size, bool* eos) {
     size_t left_col_len = local_state._left_table_data_types.size();
     hash_table_ctx.init_iterator();
-    auto block_size = 0;
+    local_state._result_indexs.clear();
+    local_state._result_indexs.reserve(batch_size);
 
-    auto add_result = [&local_state, &block_size, this](auto value) {
+    auto add_result = [&local_state](auto value) {
         auto* it = &value;
         if constexpr (is_intersect) {
             if (it->visited) { //intersected: have done probe, so visited 
values it's the result
-                _add_result_columns(local_state, value, block_size);
+                local_state._result_indexs.push_back(value.row_num);
             }
         } else {
             if (!it->visited) { //except: haven't visited values it's the 
needed result
-                _add_result_columns(local_state, value, block_size);
+                local_state._result_indexs.push_back(value.row_num);
             }
         }
     };
 
     auto& iter = hash_table_ctx.iterator;
-    for (; iter != hash_table_ctx.hash_table->end() && block_size < 
batch_size; ++iter) {
+    while (iter != hash_table_ctx.hash_table->end() &&
+           local_state._result_indexs.size() < batch_size) {
         add_result(iter->get_second());
+        ++iter;
     }
 
     *eos = iter == hash_table_ctx.hash_table->end();
     if (*eos && hash_table_ctx.hash_table->has_null_key_data()) {
         auto value = hash_table_ctx.hash_table->template 
get_null_key_data<RowRefWithFlag>();
+        // If the hashmap can store nulldata, the return value is 
RowRefWithFlag, otherwise it is char*
+        static_assert(std::is_same_v<RowRefWithFlag, 
std::decay_t<decltype(value)>> ||
+                      std::is_same_v<char*, std::decay_t<decltype(value)>>);
         if constexpr (std::is_same_v<RowRefWithFlag, 
std::decay_t<decltype(value)>>) {
             add_result(value);
         }
     }
 
+    local_state._add_result_columns();
+
     if (!output_block->mem_reuse()) {
         for (int i = 0; i < left_col_len; ++i) {
             output_block->insert(
@@ -169,18 +177,15 @@ Status 
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
 }
 
 template <bool is_intersect>
-void SetSourceOperatorX<is_intersect>::_add_result_columns(
-        SetSourceLocalState<is_intersect>& local_state, RowRefWithFlag& value, 
int& block_size) {
-    auto& build_col_idx = local_state._shared_state->build_col_idx;
-    auto& build_block = local_state._shared_state->build_block;
-
-    for (auto idx = build_col_idx.begin(); idx != build_col_idx.end(); ++idx) {
-        auto& column = *build_block.get_by_position(idx->second).column;
-        local_state._mutable_cols[idx->first]->insert_from(column, 
value.row_num);
+void SetSourceLocalState<is_intersect>::_add_result_columns() {
+    auto& build_col_idx = _shared_state->build_col_idx;
+    auto& build_block = _shared_state->build_block;
+
+    for (auto& idx : build_col_idx) {
+        const auto& column = *build_block.get_by_position(idx.second).column;
+        column.append_data_by_selector(_mutable_cols[idx.first], 
_result_indexs);
     }
-    block_size++;
 }
-
 template class SetSourceLocalState<true>;
 template class SetSourceLocalState<false>;
 template class SetSourceOperatorX<true>;
diff --git a/be/src/pipeline/exec/set_source_operator.h 
b/be/src/pipeline/exec/set_source_operator.h
index 20cfd885e04..a023888de58 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -41,6 +41,7 @@ public:
     Status open(RuntimeState* state) override;
 
 private:
+    void _add_result_columns();
     friend class SetSourceOperatorX<is_intersect>;
     friend class OperatorX<SetSourceLocalState<is_intersect>>;
     std::vector<vectorized::MutableColumnPtr> _mutable_cols;
@@ -49,6 +50,7 @@ private:
 
     RuntimeProfile::Counter* _get_data_timer = nullptr;
     RuntimeProfile::Counter* _filter_timer = nullptr;
+    vectorized::IColumn::Selector _result_indexs;
 };
 
 template <bool is_intersect>
@@ -90,9 +92,6 @@ private:
     Status _get_data_in_hashtable(SetSourceLocalState<is_intersect>& 
local_state,
                                   HashTableContext& hash_table_ctx, 
vectorized::Block* output_block,
                                   const int batch_size, bool* eos);
-
-    void _add_result_columns(SetSourceLocalState<is_intersect>& local_state, 
RowRefWithFlag& value,
-                             int& block_size);
     const size_t _child_quantity;
 };
 #include "common/compile_check_end.h"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to