This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dae8f607e17 [Improvement](set) change set_operator's hash map to phmap 
(#43273)
dae8f607e17 is described below

commit dae8f607e176480306471754d0391df3daeee8a7
Author: Pxl <pxl...@qq.com>
AuthorDate: Mon Nov 11 14:18:02 2024 +0800

    [Improvement](set) change set_operator's hash map to phmap (#43273)
    
    ### What problem does this PR solve?
    
    1. change set_operator's hash map to phmap.
    2. optimize the logic of refresh hash table
    
    ```sql
    +-------+--------------+------+-------+---------+-------+
    | Field | Type         | Null | Key   | Default | Extra |
    +-------+--------------+------+-------+---------+-------+
    | k1    | int          | Yes  | true  | NULL    |       |
    | k2    | int          | No   | false | NULL    | NONE  |
    | k3    | bigint       | Yes  | false | NULL    | NONE  |
    | k4    | varchar(100) | Yes  | false | NULL    | NONE  |
    +-------+--------------+------+-------+---------+-------+
    ```
    
    ```sql
    select count(*) from (select * from a_table intersect select * from  
b_table)t;
    before:Time(ms)=37768|CpuTimeMS=61754|PeakMemoryBytes=129921088
    after: Time(ms)=25218|CpuTimeMS=44571|PeakMemoryBytes=129902624
    ```
    ```sql
    select count(*) from (select k1 from a_table intersect select k1 from  
b_table)t;
    before: Time(ms)=7964|CpuTimeMS=13431|PeakMemoryBytes=130062400
    after:  Time(ms)=7459|CpuTimeMS=11976|PeakMemoryBytes=130091048
    ```
---
 be/src/pipeline/common/set_utils.h               | 74 +++++++++---------------
 be/src/pipeline/exec/join/join_op.h              | 18 ++----
 be/src/pipeline/exec/set_probe_sink_operator.cpp | 74 +++++++++++++-----------
 be/src/pipeline/exec/set_source_operator.cpp     | 17 +++++-
 be/src/vec/common/hash_table/hash_map.h          |  5 --
 be/src/vec/common/hash_table/hash_map_context.h  |  7 +--
 be/src/vec/common/hash_table/ph_hash_map.h       | 11 ++--
 be/src/vec/common/hash_table/ph_hash_set.h       |  3 +-
 be/src/vec/common/hash_table/string_hash_map.h   |  1 +
 9 files changed, 96 insertions(+), 114 deletions(-)

diff --git a/be/src/pipeline/common/set_utils.h 
b/be/src/pipeline/common/set_utils.h
index ed64035fb42..2caf5b7d0b8 100644
--- a/be/src/pipeline/common/set_utils.h
+++ b/be/src/pipeline/common/set_utils.h
@@ -25,21 +25,32 @@
 
 namespace doris {
 
-template <class Key>
-using SetFixedKeyHashTableContext =
-        vectorized::MethodKeysFixed<HashMap<Key, 
pipeline::RowRefListWithFlags, HashCRC32<Key>>>;
+template <typename T>
+using SetData = PHHashMap<T, RowRefListWithFlags, HashCRC32<T>>;
 
-template <class T>
-using SetPrimaryTypeHashTableContext =
-        vectorized::MethodOneNumber<T, HashMap<T, 
pipeline::RowRefListWithFlags, HashCRC32<T>>>;
+template <typename T>
+using SetFixedKeyHashTableContext = vectorized::MethodKeysFixed<SetData<T>>;
+
+template <typename T>
+using SetPrimaryTypeHashTableContext = vectorized::MethodOneNumber<T, 
SetData<T>>;
+
+template <typename T>
+using SetPrimaryTypeHashTableContextNullable = 
vectorized::MethodSingleNullableColumn<
+        vectorized::MethodOneNumber<T, 
vectorized::DataWithNullKey<SetData<T>>>>;
 
 using SetSerializedHashTableContext =
-        vectorized::MethodSerialized<HashMap<StringRef, 
pipeline::RowRefListWithFlags>>;
+        vectorized::MethodSerialized<PHHashMap<StringRef, 
RowRefListWithFlags>>;
 using SetMethodOneString =
-        vectorized::MethodStringNoCache<HashMap<StringRef, 
pipeline::RowRefListWithFlags>>;
+        vectorized::MethodStringNoCache<PHHashMap<StringRef, 
RowRefListWithFlags>>;
 
 using SetHashTableVariants =
         std::variant<std::monostate, SetSerializedHashTableContext, 
SetMethodOneString,
+                     SetPrimaryTypeHashTableContextNullable<vectorized::UInt8>,
+                     
SetPrimaryTypeHashTableContextNullable<vectorized::UInt16>,
+                     
SetPrimaryTypeHashTableContextNullable<vectorized::UInt32>,
+                     
SetPrimaryTypeHashTableContextNullable<vectorized::UInt64>,
+                     
SetPrimaryTypeHashTableContextNullable<vectorized::UInt128>,
+                     
SetPrimaryTypeHashTableContextNullable<vectorized::UInt256>,
                      SetPrimaryTypeHashTableContext<vectorized::UInt8>,
                      SetPrimaryTypeHashTableContext<vectorized::UInt16>,
                      SetPrimaryTypeHashTableContext<vectorized::UInt32>,
@@ -51,9 +62,9 @@ using SetHashTableVariants =
                      SetFixedKeyHashTableContext<vectorized::UInt256>,
                      SetFixedKeyHashTableContext<vectorized::UInt136>>;
 
-struct SetDataVariants {
-    SetHashTableVariants method_variant;
-
+struct SetDataVariants
+        : public DataVariants<SetHashTableVariants, 
vectorized::MethodSingleNullableColumn,
+                              vectorized::MethodOneNumber, 
vectorized::DataWithNullKey> {
     void init(const std::vector<vectorized::DataTypePtr>& data_types, 
HashKeyType type) {
         bool nullable = data_types.size() == 1 && data_types[0]->is_nullable();
         switch (type) {
@@ -61,51 +72,22 @@ struct SetDataVariants {
             method_variant.emplace<SetSerializedHashTableContext>();
             break;
         case HashKeyType::int8_key:
-            if (nullable) {
-                
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(
-                        get_key_sizes(data_types));
-            } else {
-                
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt8>>();
-            }
+            emplace_single<vectorized::UInt8, 
SetData<vectorized::UInt8>>(nullable);
             break;
         case HashKeyType::int16_key:
-            if (nullable) {
-                
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(
-                        get_key_sizes(data_types));
-            } else {
-                
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt16>>();
-            }
+            emplace_single<vectorized::UInt16, 
SetData<vectorized::UInt16>>(nullable);
             break;
         case HashKeyType::int32_key:
-            if (nullable) {
-                
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(
-                        get_key_sizes(data_types));
-            } else {
-                
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt32>>();
-            }
+            emplace_single<vectorized::UInt32, 
SetData<vectorized::UInt32>>(nullable);
             break;
         case HashKeyType::int64_key:
-            if (nullable) {
-                
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt128>>(
-                        get_key_sizes(data_types));
-            } else {
-                
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt64>>();
-            }
+            emplace_single<vectorized::UInt64, 
SetData<vectorized::UInt64>>(nullable);
             break;
         case HashKeyType::int128_key:
-            if (nullable) {
-                
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt136>>(
-                        get_key_sizes(data_types));
-            } else {
-                
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt128>>();
-            }
+            emplace_single<vectorized::UInt128, 
SetData<vectorized::UInt128>>(nullable);
             break;
         case HashKeyType::int256_key:
-            if (nullable) {
-                method_variant.emplace<SetSerializedHashTableContext>();
-            } else {
-                
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt256>>();
-            }
+            emplace_single<vectorized::UInt256, 
SetData<vectorized::UInt256>>(nullable);
             break;
         case HashKeyType::string_key:
             method_variant.emplace<SetMethodOneString>();
diff --git a/be/src/pipeline/exec/join/join_op.h 
b/be/src/pipeline/exec/join/join_op.h
index 616753b72de..f3bd47a911e 100644
--- a/be/src/pipeline/exec/join/join_op.h
+++ b/be/src/pipeline/exec/join/join_op.h
@@ -20,7 +20,7 @@
 #include "vec/common/columns_hashing.h"
 #include "vec/core/block.h"
 
-namespace doris::pipeline {
+namespace doris {
 /**
  * Now we have different kinds of RowRef for join operation. Overall, RowRef 
is the base class and
  * the class inheritance is below:
@@ -129,12 +129,10 @@ struct RowRefList : RowRef {
     RowRefList() = default;
     RowRefList(size_t row_num_) : RowRef(row_num_) {}
 
-    ForwardIterator<RowRefList> begin() { return 
ForwardIterator<RowRefList>(this); }
+    ForwardIterator<RowRefList> begin() { return {this}; }
 
     /// insert element after current one
-    void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
-        next.emplace_back(std::move(row_ref));
-    }
+    void insert(RowRefType&& row_ref, vectorized::Arena& pool) { 
next.emplace_back(row_ref); }
 
     void clear() { next.clear(); }
 
@@ -149,9 +147,7 @@ struct RowRefListWithFlag : RowRef {
     RowRefListWithFlag() = default;
     RowRefListWithFlag(size_t row_num_) : RowRef(row_num_) {}
 
-    ForwardIterator<RowRefListWithFlag> const begin() {
-        return ForwardIterator<RowRefListWithFlag>(this);
-    }
+    ForwardIterator<RowRefListWithFlag> begin() { return {this}; }
 
     /// insert element after current one
     void insert(RowRefType&& row_ref, vectorized::Arena& pool) { 
next.emplace_back(row_ref); }
@@ -171,9 +167,7 @@ struct RowRefListWithFlags : RowRefWithFlag {
     RowRefListWithFlags() = default;
     RowRefListWithFlags(size_t row_num_) : RowRefWithFlag(row_num_) {}
 
-    ForwardIterator<RowRefListWithFlags> const begin() {
-        return ForwardIterator<RowRefListWithFlags>(this);
-    }
+    ForwardIterator<RowRefListWithFlags> begin() { return {this}; }
 
     /// insert element after current one
     void insert(RowRefType&& row_ref, vectorized::Arena& pool) { 
next.emplace_back(row_ref); }
@@ -185,4 +179,4 @@ private:
     std::vector<RowRefType> next;
 };
 
-} // namespace doris::pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 813dad3ad79..4c250d5603b 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -210,46 +210,52 @@ void 
SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table(
             [&](auto&& arg) {
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                    auto tmp_hash_table =
-                            std::make_shared<typename 
HashTableCtxType::HashMapType>();
-                    bool is_need_shrink =
-                            
arg.hash_table->should_be_shrink(valid_element_in_hash_tbl);
-                    if (is_intersect || is_need_shrink) {
-                        tmp_hash_table->init_buf_size(size_t(
-                                valid_element_in_hash_tbl / 
arg.hash_table->get_factor() + 1));
-                    }
-
                     arg.init_iterator();
                     auto& iter = arg.iterator;
                     auto iter_end = arg.hash_table->end();
-                    std::visit(
-                            [&](auto is_need_shrink_const) {
-                                while (iter != iter_end) {
-                                    auto& mapped = iter->get_second();
-                                    auto it = mapped.begin();
-
-                                    if constexpr (is_intersect) { //intersected
-                                        if (it->visited) {
-                                            it->visited = false;
-                                            
tmp_hash_table->insert(iter->get_value());
-                                        }
-                                        ++iter;
-                                    } else { //except
-                                        if constexpr (is_need_shrink_const) {
-                                            if (!it->visited) {
-                                                
tmp_hash_table->insert(iter->get_value());
-                                            }
-                                        }
-                                        ++iter;
-                                    }
-                                }
-                            },
-                            vectorized::make_bool_variant(is_need_shrink));
 
-                    arg.reset();
-                    if (is_intersect || is_need_shrink) {
+                    constexpr double need_shrink_ratio = 0.25;
+                    bool is_need_shrink =
+                            is_intersect
+                                    ? (valid_element_in_hash_tbl <
+                                       arg.hash_table
+                                               ->size()) // When intersect, 
shrink as long as the element decreases
+                                    : (valid_element_in_hash_tbl <
+                                       arg.hash_table->size() *
+                                               need_shrink_ratio); // When 
except, element decreases need to within the 'need_shrink_ratio' before 
shrinking
+
+                    if (is_need_shrink) {
+                        auto tmp_hash_table =
+                                std::make_shared<typename 
HashTableCtxType::HashMapType>();
+                        tmp_hash_table->reserve(
+                                
local_state._shared_state->valid_element_in_hash_tbl);
+                        while (iter != iter_end) {
+                            auto& mapped = iter->get_second();
+                            auto it = mapped.begin();
+
+                            if constexpr (is_intersect) {
+                                if (it->visited) {
+                                    it->visited = false;
+                                    tmp_hash_table->insert(iter->get_first(), 
iter->get_second());
+                                }
+                            } else {
+                                if (!it->visited) {
+                                    tmp_hash_table->insert(iter->get_first(), 
iter->get_second());
+                                }
+                            }
+                            ++iter;
+                        }
                         arg.hash_table = std::move(tmp_hash_table);
+                    } else if (is_intersect) {
+                        while (iter != iter_end) {
+                            auto& mapped = iter->get_second();
+                            auto it = mapped.begin();
+                            it->visited = false;
+                            ++iter;
+                        }
                     }
+
+                    arg.reset();
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
                     __builtin_unreachable();
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index ebcd13ddf14..91c98288d8b 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -18,6 +18,7 @@
 #include "set_source_operator.h"
 
 #include <memory>
+#include <type_traits>
 
 #include "common/status.h"
 #include "pipeline/exec/operator.h"
@@ -124,11 +125,9 @@ Status 
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
         vectorized::Block* output_block, const int batch_size, bool* eos) {
     size_t left_col_len = local_state._left_table_data_types.size();
     hash_table_ctx.init_iterator();
-    auto& iter = hash_table_ctx.iterator;
     auto block_size = 0;
 
-    for (; iter != hash_table_ctx.hash_table->end() && block_size < 
batch_size; ++iter) {
-        auto& value = iter->get_second();
+    auto add_result = [&local_state, &block_size, this](auto value) {
         auto it = value.begin();
         if constexpr (is_intersect) {
             if (it->visited) { //intersected: have done probe, so visited 
values it's the result
@@ -139,9 +138,21 @@ Status 
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
                 _add_result_columns(local_state, value, block_size);
             }
         }
+    };
+
+    auto& iter = hash_table_ctx.iterator;
+    for (; iter != hash_table_ctx.hash_table->end() && block_size < 
batch_size; ++iter) {
+        add_result(iter->get_second());
     }
 
     *eos = iter == hash_table_ctx.hash_table->end();
+    if (*eos && hash_table_ctx.hash_table->has_null_key_data()) {
+        auto value = hash_table_ctx.hash_table->template 
get_null_key_data<RowRefListWithFlags>();
+        if constexpr (std::is_same_v<RowRefListWithFlags, 
std::decay_t<decltype(value)>>) {
+            add_result(value);
+        }
+    }
+
     if (!output_block->mem_reuse()) {
         for (int i = 0; i < left_col_len; ++i) {
             output_block->insert(
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 448ddd5b7c5..8cb02d6a80d 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -201,11 +201,6 @@ using HashMap = HashMapTable<Key, HashMapCell<Key, Mapped, 
Hash>, Hash, Grower,
 template <typename Key, typename Hash = DefaultHash<Key>>
 using JoinHashMap = JoinHashTable<Key, Hash>;
 
-template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
-          typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
-using HashMapWithSavedHash =
-        HashMapTable<Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash, 
Grower, Allocator>;
-
 template <typename Key, typename Mapped, typename Hash, size_t 
initial_size_degree>
 using HashMapWithStackMemory = HashMapTable<
         Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash,
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 16a793d7500..875c035b425 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -33,10 +33,6 @@
 #include "vec/core/types.h"
 #include "vec/utils/util.hpp"
 
-namespace doris::pipeline {
-struct RowRefListWithFlags;
-}
-
 namespace doris::vectorized {
 
 constexpr auto BITSIZE = 8;
@@ -587,8 +583,7 @@ struct DataWithNullKey : public Base {
 
 private:
     bool has_null_key = false;
-    // null_key_data store AggregateDataPtr on agg node, store PartitionBlocks 
on partition sort node.
-    void* null_key_data = nullptr;
+    Base::Value null_key_data;
 };
 
 /// Single low cardinality column.
diff --git a/be/src/vec/common/hash_table/ph_hash_map.h 
b/be/src/vec/common/hash_table/ph_hash_map.h
index de320425223..414624c6e1a 100644
--- a/be/src/vec/common/hash_table/ph_hash_map.h
+++ b/be/src/vec/common/hash_table/ph_hash_map.h
@@ -40,6 +40,7 @@ public:
 
     using key_type = Key;
     using mapped_type = Mapped;
+    using Value = Mapped;
     using value_type = std::pair<const Key, Mapped>;
 
     using LookupResult = std::pair<const Key, Mapped>*;
@@ -154,10 +155,8 @@ public:
                                                 [&](const auto& ctor) { 
f(ctor, key, key); });
     }
 
-    void ALWAYS_INLINE insert(const Key& key, size_t hash_value, const Mapped& 
value) {
-        auto it = &*_hash_map.lazy_emplace_with_hash(key, hash_value,
-                                                     [&](const auto& ctor) { 
ctor(key, value); });
-        it->second = value;
+    void ALWAYS_INLINE insert(const Key& key, const Mapped& value) {
+        _hash_map.lazy_emplace(key, [&](const auto& ctor) { ctor(key, value); 
});
     }
 
     template <typename KeyHolder>
@@ -190,8 +189,6 @@ public:
         return capacity * sizeof(typename HashMapImpl::slot_type);
     }
 
-    size_t get_buffer_size_in_cells() const { return _hash_map.capacity(); }
-
     bool add_elem_size_overflow(size_t row) const {
         const auto capacity = _hash_map.capacity();
         // phmap use 7/8th as maximum load factor.
@@ -209,7 +206,7 @@ public:
 
     void clear_and_shrink() { _hash_map.clear(); }
 
-    void expanse_for_add_elem(size_t num_elem) { _hash_map.reserve(num_elem); }
+    void reserve(size_t num_elem) { _hash_map.reserve(num_elem); }
 
 private:
     HashMapImpl _hash_map;
diff --git a/be/src/vec/common/hash_table/ph_hash_set.h 
b/be/src/vec/common/hash_table/ph_hash_set.h
index 6ace649b7bb..79faca9d670 100644
--- a/be/src/vec/common/hash_table/ph_hash_set.h
+++ b/be/src/vec/common/hash_table/ph_hash_set.h
@@ -36,6 +36,7 @@ public:
     using key_type = Key;
     using mapped_type = void;
     using value_type = void;
+    using Value = void*;
 
     using LookupResult = void*;
 
@@ -104,7 +105,7 @@ public:
 
     void clear_and_shrink() { _hash_set.clear(); }
 
-    void expanse_for_add_elem(size_t num_elem) { _hash_set.reserve(num_elem); }
+    void reserve(size_t num_elem) { _hash_set.reserve(num_elem); }
 
 private:
     HashSetImpl _hash_set;
diff --git a/be/src/vec/common/hash_table/string_hash_map.h 
b/be/src/vec/common/hash_table/string_hash_map.h
index 6c7a9e74dca..cffe32e82ce 100644
--- a/be/src/vec/common/hash_table/string_hash_map.h
+++ b/be/src/vec/common/hash_table/string_hash_map.h
@@ -114,6 +114,7 @@ public:
     using Base = StringHashTable<StringHashMapSubMaps<TMapped, Allocator>>;
     using Self = StringHashMap;
     using LookupResult = typename Base::LookupResult;
+    using Value = TMapped;
 
     using Base::Base;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to