This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 54d1630c42 [Opt](vectorized) speed up hash function compute in hash 
partition (#12334)
54d1630c42 is described below

commit 54d1630c429ad8222f09a2b718eb29010317605c
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Wed Sep 7 10:11:40 2022 +0800

    [Opt](vectorized) speed up hash function compute in hash partition (#12334)
    
    After do the opt of hash function, the compute of siphash in HASH_PARTITION 
in vdata_stream_sender
    
    Before: 1s800ms
    After: 800ms
---
 be/src/vec/columns/column.h                 | 21 +++++++++++++++++++++
 be/src/vec/columns/column_array.cpp         |  6 +++++-
 be/src/vec/columns/column_array.h           |  2 ++
 be/src/vec/columns/column_complex.h         |  6 ++++++
 be/src/vec/columns/column_const.cpp         | 17 +++++++++++++++++
 be/src/vec/columns/column_const.h           |  3 +++
 be/src/vec/columns/column_decimal.cpp       |  6 ++++++
 be/src/vec/columns/column_decimal.h         |  3 +++
 be/src/vec/columns/column_dummy.h           |  3 +++
 be/src/vec/columns/column_nullable.cpp      | 17 +++++++++++++++++
 be/src/vec/columns/column_nullable.h        |  2 ++
 be/src/vec/columns/column_string.h          |  6 ++++++
 be/src/vec/columns/column_vector.cpp        |  6 ++++++
 be/src/vec/columns/column_vector.h          |  3 +++
 be/src/vec/exprs/vruntimefilter_wrapper.cpp |  1 -
 be/src/vec/sink/vdata_stream_sender.cpp     |  5 +----
 16 files changed, 101 insertions(+), 6 deletions(-)

diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 565a9416d7..fc3ab1478d 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -30,6 +30,19 @@
 
 class SipHash;
 
+#define SIP_HASHES_FUNCTION_COLUMN_IMPL()                                \
+    auto s = hashes.size();                                              \
+    DCHECK(s == size());                                                 \
+    if (null_data == nullptr) {                                          \
+        for (size_t i = 0; i < s; i++) {                                 \
+            update_hash_with_value(i, hashes[i]);                        \
+        }                                                                \
+    } else {                                                             \
+        for (size_t i = 0; i < s; i++) {                                 \
+            if (null_data[i] == 0) update_hash_with_value(i, hashes[i]); \
+        }                                                                \
+    }
+
 namespace doris::vectorized {
 
 class Arena;
@@ -286,6 +299,14 @@ public:
     ///  passed bytes to hash must identify sequence of values unambiguously.
     virtual void update_hash_with_value(size_t n, SipHash& hash) const = 0;
 
+    /// Update state of hash function with value of n elements to avoid the 
virtual function call
+    /// null_data to mark whether need to do hash compute, null_data == nullptr
+    /// means all element need to do hash function, else only *null_data != 0 
need to do hash func
+    virtual void update_hashes_with_value(std::vector<SipHash>& hash,
+                                          const uint8_t* __restrict null_data 
= nullptr) const {
+        LOG(FATAL) << "update_hashes_with_value not supported";
+    };
+
     /** Removes elements that don't match the filter.
       * Is used in WHERE and HAVING operations.
       * If result_size_hint > 0, then makes advance reserve(result_size_hint) 
for the result column;
diff --git a/be/src/vec/columns/column_array.cpp 
b/be/src/vec/columns/column_array.cpp
index c498b72345..db52f62c63 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -219,10 +219,14 @@ void ColumnArray::update_hash_with_value(size_t n, 
SipHash& hash) const {
     size_t array_size = size_at(n);
     size_t offset = offset_at(n);
 
-    hash.update(array_size);
     for (size_t i = 0; i < array_size; ++i) 
get_data().update_hash_with_value(offset + i, hash);
 }
 
+void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes,
+                                           const uint8_t* __restrict 
null_data) const {
+    SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
 void ColumnArray::insert(const Field& x) {
     const Array& array = doris::vectorized::get<const Array&>(x);
     size_t size = array.size();
diff --git a/be/src/vec/columns/column_array.h 
b/be/src/vec/columns/column_array.h
index 686089f4e9..820064505d 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -83,6 +83,8 @@ public:
     StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& 
begin) const override;
     const char* deserialize_and_insert_from_arena(const char* pos) override;
     void update_hash_with_value(size_t n, SipHash& hash) const override;
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const 
override;
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
     void insert(const Field& x) override;
     void insert_from(const IColumn& src_, size_t n) override;
diff --git a/be/src/vec/columns/column_complex.h 
b/be/src/vec/columns/column_complex.h
index 81e826d5b9..d7e99406e5 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -183,10 +183,16 @@ public:
         LOG(FATAL) << "deserialize_and_insert_from_arena not implemented";
     }
 
+    // maybe we do not need to impl the function
     void update_hash_with_value(size_t n, SipHash& hash) const override {
         // TODO add hash function
     }
 
+    void update_hashes_with_value(std::vector<SipHash>& hash,
+                                  const uint8_t* __restrict null_data) const 
override {
+        // TODO add hash function
+    }
+
     [[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs,
                                 int nan_direction_hint) const override {
         LOG(FATAL) << "compare_at not implemented";
diff --git a/be/src/vec/columns/column_const.cpp 
b/be/src/vec/columns/column_const.cpp
index efb790249a..946074cf61 100644
--- a/be/src/vec/columns/column_const.cpp
+++ b/be/src/vec/columns/column_const.cpp
@@ -22,6 +22,7 @@
 
 #include "vec/columns/columns_common.h"
 #include "vec/common/pod_array.h"
+#include "vec/common/sip_hash.h"
 #include "vec/common/typeid_cast.h"
 
 namespace doris::vectorized {
@@ -86,6 +87,22 @@ ColumnPtr ColumnConst::permute(const Permutation& perm, 
size_t limit) const {
     return ColumnConst::create(data, limit);
 }
 
+void ColumnConst::update_hashes_with_value(std::vector<SipHash>& hashes,
+                                           const uint8_t* __restrict 
null_data) const {
+    DCHECK(null_data == nullptr);
+    DCHECK(hashes.size() == size());
+    auto real_data = data->get_data_at(0);
+    if (real_data.data == nullptr) {
+        for (auto& hash : hashes) {
+            hash.update(0);
+        }
+    } else {
+        for (auto& hash : hashes) {
+            hash.update(real_data.data, real_data.size);
+        }
+    }
+}
+
 MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector& 
selector) const {
     if (s != selector.size()) {
         LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of 
column ({})",
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 615c936a5e..be7f56ab23 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -132,6 +132,9 @@ public:
         data->update_hash_with_value(0, hash);
     }
 
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const 
override;
+
     ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const 
override;
     ColumnPtr replicate(const Offsets& offsets) const override;
     void replicate(const uint32_t* counts, size_t target_size, IColumn& 
column) const override;
diff --git a/be/src/vec/columns/column_decimal.cpp 
b/be/src/vec/columns/column_decimal.cpp
index 1e34056286..91145e4473 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -120,6 +120,12 @@ void ColumnDecimal<T>::update_hash_with_value(size_t n, 
SipHash& hash) const {
     hash.update(data[n]);
 }
 
+template <typename T>
+void ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
+                                                const uint8_t* __restrict 
null_data) const {
+    SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
 template <typename T>
 void ColumnDecimal<T>::get_permutation(bool reverse, size_t limit, int,
                                        IColumn::Permutation& res) const {
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index a43227bd41..2b3dac0ae5 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -154,6 +154,9 @@ public:
                                        const uint8_t* null_map) override;
 
     void update_hash_with_value(size_t n, SipHash& hash) const override;
+    void update_hashes_with_value(std::vector<SipHash>& hash,
+                                  const uint8_t* __restrict null_data) const 
override;
+
     int compare_at(size_t n, size_t m, const IColumn& rhs_, int 
nan_direction_hint) const override;
     void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
                          IColumn::Permutation& res) const override;
diff --git a/be/src/vec/columns/column_dummy.h 
b/be/src/vec/columns/column_dummy.h
index e7fb657161..96547bfd00 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -74,6 +74,9 @@ public:
 
     void update_hash_with_value(size_t /*n*/, SipHash& /*hash*/) const 
override {}
 
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const 
override {};
+
     void insert_from(const IColumn&, size_t) override { ++s; }
 
     void insert_range_from(const IColumn& /*src*/, size_t /*start*/, size_t 
length) override {
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index 14eebe3150..cff4d0f7c3 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -20,6 +20,7 @@
 
 #include "vec/columns/column_nullable.h"
 
+#include "util/simd/bits.h"
 #include "vec/columns/column_const.h"
 #include "vec/common/arena.h"
 #include "vec/common/assert_cast.h"
@@ -50,6 +51,22 @@ void ColumnNullable::update_hash_with_value(size_t n, 
SipHash& hash) const {
         get_nested_column().update_hash_with_value(n, hash);
 }
 
+void ColumnNullable::update_hashes_with_value(std::vector<SipHash>& hashes,
+                                              const uint8_t* __restrict 
null_data) const {
+    DCHECK(null_data == nullptr);
+    auto s = hashes.size();
+    DCHECK(s == size());
+    auto* __restrict real_null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data().data();
+    if (doris::simd::count_zero_num(reinterpret_cast<const 
int8_t*>(real_null_data), s) == s) {
+        nested_column->update_hashes_with_value(hashes, nullptr);
+    } else {
+        for (int i = 0; i < s; ++i) {
+            if (real_null_data[i] != 0) hashes[i].update(0);
+        }
+        nested_column->update_hashes_with_value(hashes, real_null_data);
+    }
+}
+
 MutableColumnPtr ColumnNullable::clone_resized(size_t new_size) const {
     MutableColumnPtr new_nested_col = 
get_nested_column().clone_resized(new_size);
     auto new_null_map = ColumnUInt8::create();
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 05807e10ec..397f3be1bd 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -159,6 +159,8 @@ public:
     ColumnPtr replicate(const Offsets& replicate_offsets) const override;
     void replicate(const uint32_t* counts, size_t target_size, IColumn& 
column) const override;
     void update_hash_with_value(size_t n, SipHash& hash) const override;
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const 
override;
     void get_extremes(Field& min, Field& max) const override;
 
     MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) 
const override {
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index d8a704e362..d2744e20b7 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -246,10 +246,16 @@ public:
         size_t string_size = size_at(n);
         size_t offset = offset_at(n);
 
+        // TODO: Rethink we really need to update the string_size?
         hash.update(reinterpret_cast<const char*>(&string_size), 
sizeof(string_size));
         hash.update(reinterpret_cast<const char*>(&chars[offset]), 
string_size);
     }
 
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const 
override {
+        SIP_HASHES_FUNCTION_COLUMN_IMPL();
+    }
+
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
 
     void insert_indices_from(const IColumn& src, const int* indices_begin,
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index 279a777992..13c5810026 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -105,6 +105,12 @@ void ColumnVector<T>::update_hash_with_value(size_t n, 
SipHash& hash) const {
     hash.update(data[n]);
 }
 
+template <typename T>
+void ColumnVector<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
+                                               const uint8_t* __restrict 
null_data) const {
+    SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
 template <typename T>
 struct ColumnVector<T>::less {
     const Self& parent;
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index ad39b814f2..5101894b92 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -247,6 +247,9 @@ public:
 
     void update_hash_with_value(size_t n, SipHash& hash) const override;
 
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const 
override;
+
     size_t byte_size() const override { return data.size() * sizeof(data[0]); }
 
     size_t allocated_bytes() const override { return data.allocated_bytes(); }
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp 
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index 869de36a26..bbd7cf576c 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -21,7 +21,6 @@
 
 #include "util/simd/bits.h"
 #include "vec/columns/column_nullable.h"
-#include "vec/columns/column_set.h"
 #include "vec/core/field.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/functions/simple_function_factory.h"
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 34b98a9967..e22e9277af 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -528,10 +528,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
         std::vector<SipHash> siphashs(rows);
         // result[j] means column index, i means rows index
         for (int j = 0; j < result_size; ++j) {
-            auto column = block->get_by_position(result[j]).column;
-            for (int i = 0; i < rows; ++i) {
-                column->update_hash_with_value(i, siphashs[i]);
-            }
+            
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
         }
 
         // channel2rows' subscript means channel id


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to