This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 31a5e072e76 [refactor](pipelineX) Simplify set operation (#25502)
31a5e072e76 is described below

commit 31a5e072e76eb6f5956c1fedc135cc6e95d982d8
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Oct 17 15:11:46 2023 +0800

    [refactor](pipelineX) Simplify set operation (#25502)
---
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  6 +--
 be/src/pipeline/exec/set_probe_sink_operator.h     |  5 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  6 +--
 be/src/pipeline/exec/set_sink_operator.h           |  6 ++-
 .../vec/common/hash_table/hash_table_set_build.h   | 55 +++-------------------
 .../vec/common/hash_table/hash_table_set_probe.h   | 52 +++-----------------
 be/src/vec/exec/vset_operation_node.cpp            |  2 +-
 be/src/vec/exec/vset_operation_node.h              |  3 ++
 8 files changed, 30 insertions(+), 105 deletions(-)

diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 81c30d45d1e..aea9aff0e75 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -131,9 +131,9 @@ Status 
SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
                 [&](auto&& arg) -> Status {
                     using HashTableCtxType = std::decay_t<decltype(arg)>;
                     if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                        vectorized::HashTableProbeX<HashTableCtxType, 
is_intersect>
-                                process_hashtable_ctx(local_state, probe_rows);
-                        return 
process_hashtable_ctx.mark_data_in_hashtable(local_state, arg);
+                        vectorized::HashTableProbe<HashTableCtxType, 
is_intersect>
+                                process_hashtable_ctx(&local_state, 
probe_rows);
+                        return 
process_hashtable_ctx.mark_data_in_hashtable(arg);
                     } else {
                         LOG(FATAL) << "FATAL: uninited hash table";
                     }
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 2c2f1ce1c60..45176fd0099 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -31,7 +31,7 @@ class RuntimeState;
 namespace vectorized {
 class Block;
 template <class HashTableContext, bool is_intersected>
-struct HashTableProbeX;
+struct HashTableProbe;
 } // namespace vectorized
 
 namespace pipeline {
@@ -81,11 +81,12 @@ public:
             : Base(parent, state) {}
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    int64_t* valid_element_in_hash_tbl() { return 
&_shared_state->valid_element_in_hash_tbl; }
 
 private:
     friend class SetProbeSinkOperatorX<is_intersect>;
     template <class HashTableContext, bool is_intersected>
-    friend struct vectorized::HashTableProbeX;
+    friend struct vectorized::HashTableProbe;
 
     //record insert column id during probe
     std::vector<uint16_t> _probe_column_inserted_id;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 604729a4700..6725deffa14 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -116,9 +116,9 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
             [&](auto&& arg) {
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                    vectorized::HashTableBuildX<HashTableCtxType, is_intersect>
-                            hash_table_build_process(rows, raw_ptrs, offset, 
state);
-                    static_cast<void>(hash_table_build_process(local_state, 
arg));
+                    vectorized::HashTableBuild<HashTableCtxType, is_intersect>
+                            hash_table_build_process(&local_state, rows, 
raw_ptrs, offset, state);
+                    static_cast<void>(hash_table_build_process(arg, 
local_state._arena));
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 945ec06891c..5383b1b3a55 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -29,7 +29,7 @@ class ExecNode;
 
 namespace vectorized {
 template <class HashTableContext, bool is_intersected>
-struct HashTableBuildX;
+struct HashTableBuild;
 }
 
 namespace pipeline {
@@ -74,10 +74,12 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
+    int64_t* mem_used() { return &_shared_state->mem_used; };
+
 private:
     friend class SetSinkOperatorX<is_intersect>;
     template <class HashTableContext, bool is_intersected>
-    friend struct vectorized::HashTableBuildX;
+    friend struct vectorized::HashTableBuild;
 
     RuntimeProfile::Counter* _build_timer; // time to build hash table
     vectorized::MutableBlock _mutable_block;
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h 
b/be/src/vec/common/hash_table/hash_table_set_build.h
index ff1fec3ab1c..e3c1ed27b1f 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -23,13 +23,13 @@
 namespace doris::vectorized {
 template <class HashTableContext, bool is_intersect>
 struct HashTableBuild {
-    HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs,
-                   VSetOperationNode<is_intersect>* operation_node, uint8_t 
offset,
+    template <typename Parent>
+    HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, 
uint8_t offset,
                    RuntimeState* state)
-            : _rows(rows),
+            : _mem_used(parent->mem_used()),
+              _rows(rows),
               _offset(offset),
               _build_raw_ptrs(build_raw_ptrs),
-              _operation_node(operation_node),
               _state(state) {}
 
     Status operator()(HashTableContext& hash_table_ctx, Arena& arena) {
@@ -39,7 +39,7 @@ struct HashTableBuild {
 
         Defer defer {[&]() {
             int64_t bucket_bytes = 
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
-            _operation_node->_mem_used += bucket_bytes - old_bucket_bytes;
+            *_mem_used += bucket_bytes - old_bucket_bytes;
         }};
 
         KeyGetter key_getter(_build_raw_ptrs);
@@ -62,54 +62,11 @@ struct HashTableBuild {
     }
 
 private:
+    int64_t* _mem_used;
     const int _rows;
     const uint8_t _offset;
     ColumnRawPtrs& _build_raw_ptrs;
-    VSetOperationNode<is_intersect>* _operation_node;
     RuntimeState* _state;
 };
 
-template <class HashTableContext, bool is_intersect>
-struct HashTableBuildX {
-    HashTableBuildX(int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset, 
RuntimeState* state)
-            : _rows(rows), _offset(offset), _build_raw_ptrs(build_raw_ptrs), 
_state(state) {}
-
-    Status operator()(pipeline::SetSinkLocalState<is_intersect>& local_state,
-                      HashTableContext& hash_table_ctx) {
-        using KeyGetter = typename HashTableContext::State;
-        using Mapped = typename HashTableContext::Mapped;
-        int64_t old_bucket_bytes = 
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
-
-        Defer defer {[&]() {
-            int64_t bucket_bytes = 
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
-            local_state._shared_state->mem_used += bucket_bytes - 
old_bucket_bytes;
-        }};
-
-        KeyGetter key_getter(_build_raw_ptrs);
-        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows);
-
-        size_t k = 0;
-        auto creator = [&](const auto& ctor, auto& key, auto& origin) {
-            HashTableContext::try_presis_key(key, origin, local_state._arena);
-            ctor(key, Mapped {k, _offset});
-        };
-        auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; 
};
-
-        for (; k < _rows; ++k) {
-            if (k % CHECK_FRECUENCY == 0) {
-                RETURN_IF_CANCELLED(_state);
-            }
-            hash_table_ctx.lazy_emplace(key_getter, k, creator, 
creator_for_null_key);
-        }
-        return Status::OK();
-    }
-
-private:
-    const int _rows;
-    const uint8_t _offset;
-    ColumnRawPtrs& _build_raw_ptrs;
-    RuntimeState* _state;
-    std::vector<size_t> _build_side_hash_values;
-};
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/common/hash_table/hash_table_set_probe.h 
b/be/src/vec/common/hash_table/hash_table_set_probe.h
index 4a79b86a146..eb00cca8561 100644
--- a/be/src/vec/common/hash_table/hash_table_set_probe.h
+++ b/be/src/vec/common/hash_table/hash_table_set_probe.h
@@ -24,10 +24,11 @@ namespace doris::vectorized {
 
 template <class HashTableContext, bool is_intersected>
 struct HashTableProbe {
-    HashTableProbe(VSetOperationNode<is_intersected>* operation_node, int 
probe_rows)
-            : _operation_node(operation_node),
+    template <typename Parent>
+    HashTableProbe(Parent* parent, int probe_rows)
+            : _valid_element_in_hash_tbl(parent->valid_element_in_hash_tbl()),
               _probe_rows(probe_rows),
-              _probe_raw_ptrs(operation_node->_probe_columns) {}
+              _probe_raw_ptrs(parent->_probe_columns) {}
 
     Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) {
         using KeyGetter = typename HashTableContext::State;
@@ -43,49 +44,9 @@ struct HashTableProbe {
                     if (!(it->visited)) {
                         it->visited = true;
                         if constexpr (is_intersected) { //intersected
-                            _operation_node->_valid_element_in_hash_tbl++;
+                            (*_valid_element_in_hash_tbl)++;
                         } else {
-                            _operation_node->_valid_element_in_hash_tbl--; 
//except
-                        }
-                    }
-                }
-            }
-        } else {
-            LOG(FATAL) << "Invalid RowRefListType!";
-        }
-        return Status::OK();
-    }
-
-private:
-    VSetOperationNode<is_intersected>* _operation_node;
-    const size_t _probe_rows;
-    ColumnRawPtrs& _probe_raw_ptrs;
-    std::vector<StringRef> _probe_keys;
-};
-
-template <class HashTableContext, bool is_intersected>
-struct HashTableProbeX {
-    HashTableProbeX(pipeline::SetProbeSinkLocalState<is_intersected>& 
local_state, int probe_rows)
-            : _probe_rows(probe_rows), 
_probe_raw_ptrs(local_state._probe_columns) {}
-
-    Status 
mark_data_in_hashtable(pipeline::SetProbeSinkLocalState<is_intersected>& 
local_state,
-                                  HashTableContext& hash_table_ctx) {
-        using KeyGetter = typename HashTableContext::State;
-
-        KeyGetter key_getter(_probe_raw_ptrs);
-        hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, _probe_rows);
-
-        if constexpr (std::is_same_v<typename HashTableContext::Mapped, 
RowRefListWithFlags>) {
-            for (int probe_index = 0; probe_index < _probe_rows; 
probe_index++) {
-                auto find_result = hash_table_ctx.find(key_getter, 
probe_index);
-                if (find_result.is_found()) { //if found, marked visited
-                    auto it = find_result.get_mapped().begin();
-                    if (!(it->visited)) {
-                        it->visited = true;
-                        if constexpr (is_intersected) { //intersected
-                            
local_state._shared_state->valid_element_in_hash_tbl++;
-                        } else {
-                            
local_state._shared_state->valid_element_in_hash_tbl--; //except
+                            (*_valid_element_in_hash_tbl)--; //except
                         }
                     }
                 }
@@ -97,6 +58,7 @@ struct HashTableProbeX {
     }
 
 private:
+    int64_t* _valid_element_in_hash_tbl;
     const size_t _probe_rows;
     ColumnRawPtrs& _probe_raw_ptrs;
     std::vector<StringRef> _probe_keys;
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 9b15db67b3c..d284385b8ed 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -320,7 +320,7 @@ Status 
VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     HashTableBuild<HashTableCtxType, is_intersect> 
hash_table_build_process(
-                            rows, raw_ptrs, this, offset, state);
+                            this, rows, raw_ptrs, offset, state);
                     st = hash_table_build_process(arg, _arena);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
diff --git a/be/src/vec/exec/vset_operation_node.h 
b/be/src/vec/exec/vset_operation_node.h
index dfd96430115..ff016469f49 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -73,6 +73,9 @@ public:
 
     bool is_child_finished(int child_id) const;
 
+    int64_t* valid_element_in_hash_tbl() { return &_valid_element_in_hash_tbl; 
}
+    int64_t* mem_used() { return &_mem_used; };
+
 private:
     void _finalize_probe(int child_id);
     //Todo: In build process of hashtable, It's same as join node.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to