This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e413a2b8e9 [Opt](vectorized) Use new way to do hash shffle to speed up 
query (#12586)
e413a2b8e9 is described below

commit e413a2b8e914ed13cf1fd6c44e7c6b3cce9c2dd8
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Thu Sep 15 11:08:04 2022 +0800

    [Opt](vectorized) Use new way to do hash shffle to speed up query (#12586)
---
 be/src/util/hash_util.hpp                          | 13 ++++
 be/src/vec/columns/column.h                        | 21 ++++--
 be/src/vec/columns/column_array.cpp                |  5 --
 be/src/vec/columns/column_array.h                  |  2 -
 be/src/vec/columns/column_complex.h                |  5 --
 be/src/vec/columns/column_const.cpp                | 18 ++++-
 be/src/vec/columns/column_const.h                  |  5 +-
 be/src/vec/columns/column_decimal.cpp              | 12 +++-
 be/src/vec/columns/column_decimal.h                |  6 +-
 be/src/vec/columns/column_dummy.h                  |  5 --
 be/src/vec/columns/column_nullable.cpp             | 18 ++++-
 be/src/vec/columns/column_nullable.h               |  4 +-
 be/src/vec/columns/column_string.cpp               |  2 +-
 be/src/vec/columns/column_string.h                 | 13 +++-
 be/src/vec/columns/column_vector.cpp               | 12 +++-
 be/src/vec/columns/column_vector.h                 |  5 +-
 be/src/vec/sink/vdata_stream_sender.cpp            | 80 ++++++++++++----------
 be/src/vec/sink/vdata_stream_sender.h              | 16 +++--
 .../java/org/apache/doris/qe/SessionVariable.java  |  5 ++
 gensrc/thrift/PaloInternalService.thrift           |  2 +
 20 files changed, 173 insertions(+), 76 deletions(-)

diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp
index 0ebcc6e11b..4ee40a06bc 100644
--- a/be/src/util/hash_util.hpp
+++ b/be/src/util/hash_util.hpp
@@ -33,6 +33,7 @@
 #elif __aarch64__
 #include <sse2neon.h>
 #endif
+#include <xxh3.h>
 #include <zlib.h>
 
 #include "gen_cpp/Types_types.h"
@@ -363,6 +364,18 @@ public:
         std::hash<T> hasher;
         seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
     }
+
+    // xxHash function for a byte array.  For convenience, a 64-bit seed is 
also
+    // hashed into the result.  The mapping may change from time to time.
+    static xxh_u64 xxHash64WithSeed(const char* s, size_t len, xxh_u64 seed) {
+        return XXH3_64bits_withSeed(s, len, seed);
+    }
+
+    // same to the up function, just for null value
+    static xxh_u64 xxHash64NullWithSeed(xxh_u64 seed) {
+        static const int INT_VALUE = 0;
+        return XXH3_64bits_withSeed(reinterpret_cast<const char*>(&INT_VALUE), 
sizeof(int), seed);
+    }
 };
 
 } // namespace doris
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 9334c97476..f58cfa8c0e 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -323,23 +323,36 @@ public:
         LOG(FATAL) << "deserialize_vec_with_null_map not supported";
     }
 
+    /// TODO: SipHash is slower than city or xx hash, rethink we should have a 
new interface
     /// Update state of hash function with value of n-th element.
     /// On subsequent calls of this method for sequence of column values of 
arbitrary types,
     ///  passed bytes to hash must identify sequence of values unambiguously.
-    virtual void update_hash_with_value(size_t n, SipHash& hash) const = 0;
+    virtual void update_hash_with_value(size_t n, SipHash& hash) const {
+        LOG(FATAL) << "update_hash_with_value siphash not supported";
+    }
+
+    /// Update state of hash function with value of n elements to avoid the 
virtual function call
+    /// null_data to mark whether need to do hash compute, null_data == nullptr
+    /// means all element need to do hash function, else only *null_data != 0 
need to do hash func
+    /// do xxHash here, faster than other hash method
+    virtual void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                          const uint8_t* __restrict null_data 
= nullptr) const {
+        LOG(FATAL) << "update_hashes_with_value siphash not supported";
+    };
 
     /// Update state of hash function with value of n elements to avoid the 
virtual function call
     /// null_data to mark whether need to do hash compute, null_data == nullptr
     /// means all element need to do hash function, else only *null_data != 0 
need to do hash func
-    virtual void update_hashes_with_value(std::vector<SipHash>& hash,
+    /// do xxHash here, faster than other sip hash
+    virtual void update_hashes_with_value(uint64_t* __restrict hashes,
                                           const uint8_t* __restrict null_data 
= nullptr) const {
-        LOG(FATAL) << "update_hashes_with_value not supported";
+        LOG(FATAL) << "update_hashes_with_value xxhash not supported";
     };
 
     /// Update state of crc32 hash function with value of n elements to avoid 
the virtual function call
     /// null_data to mark whether need to do hash compute, null_data == nullptr
     /// means all element need to do hash function, else only *null_data != 0 
need to do hash func
-    virtual void update_crcs_with_value(std::vector<uint32_t>& hash, 
PrimitiveType type,
+    virtual void update_crcs_with_value(std::vector<uint64_t>& hash, 
PrimitiveType type,
                                         const uint8_t* __restrict null_data = 
nullptr) const {
         LOG(FATAL) << "update_crcs_with_value not supported";
     };
diff --git a/be/src/vec/columns/column_array.cpp 
b/be/src/vec/columns/column_array.cpp
index f9b1c53467..510b1fb681 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -228,11 +228,6 @@ void ColumnArray::update_hash_with_value(size_t n, 
SipHash& hash) const {
     for (size_t i = 0; i < array_size; ++i) 
get_data().update_hash_with_value(offset + i, hash);
 }
 
-void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes,
-                                           const uint8_t* __restrict 
null_data) const {
-    SIP_HASHES_FUNCTION_COLUMN_IMPL();
-}
-
 void ColumnArray::insert(const Field& x) {
     const Array& array = doris::vectorized::get<const Array&>(x);
     size_t size = array.size();
diff --git a/be/src/vec/columns/column_array.h 
b/be/src/vec/columns/column_array.h
index 2f8df6d83d..4d8e8f4d3e 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -98,8 +98,6 @@ public:
     StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& 
begin) const override;
     const char* deserialize_and_insert_from_arena(const char* pos) override;
     void update_hash_with_value(size_t n, SipHash& hash) const override;
-    void update_hashes_with_value(std::vector<SipHash>& hashes,
-                                  const uint8_t* __restrict null_data) const 
override;
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
     void insert(const Field& x) override;
     void insert_from(const IColumn& src_, size_t n) override;
diff --git a/be/src/vec/columns/column_complex.h 
b/be/src/vec/columns/column_complex.h
index d7e99406e5..2a2c21f19b 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -188,11 +188,6 @@ public:
         // TODO add hash function
     }
 
-    void update_hashes_with_value(std::vector<SipHash>& hash,
-                                  const uint8_t* __restrict null_data) const 
override {
-        // TODO add hash function
-    }
-
     [[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs,
                                 int nan_direction_hint) const override {
         LOG(FATAL) << "compare_at not implemented";
diff --git a/be/src/vec/columns/column_const.cpp 
b/be/src/vec/columns/column_const.cpp
index 4bd72ccf8d..dc7142693e 100644
--- a/be/src/vec/columns/column_const.cpp
+++ b/be/src/vec/columns/column_const.cpp
@@ -104,7 +104,7 @@ void 
ColumnConst::update_hashes_with_value(std::vector<SipHash>& hashes,
     }
 }
 
-void ColumnConst::update_crcs_with_value(std::vector<uint32_t>& hashes, 
doris::PrimitiveType type,
+void ColumnConst::update_crcs_with_value(std::vector<uint64_t>& hashes, 
doris::PrimitiveType type,
                                          const uint8_t* __restrict null_data) 
const {
     DCHECK(null_data == nullptr);
     DCHECK(hashes.size() == size());
@@ -121,6 +121,22 @@ void 
ColumnConst::update_crcs_with_value(std::vector<uint32_t>& hashes, doris::P
     }
 }
 
+void ColumnConst::update_hashes_with_value(uint64_t* __restrict hashes,
+                                           const uint8_t* __restrict 
null_data) const {
+    DCHECK(null_data == nullptr);
+    auto real_data = data->get_data_at(0);
+    auto real_size = size();
+    if (real_data.data == nullptr) {
+        for (int i = 0; i < real_size; ++i) {
+            hashes[i] = HashUtil::xxHash64NullWithSeed(hashes[i]);
+        }
+    } else {
+        for (int i = 0; i < real_size; ++i) {
+            hashes[i] = HashUtil::xxHash64WithSeed(real_data.data, 
real_data.size, hashes[i]);
+        }
+    }
+}
+
 MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector& 
selector) const {
     if (s != selector.size()) {
         LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of 
column ({})",
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 3f4e735780..f001150bee 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -130,9 +130,12 @@ public:
     void update_hashes_with_value(std::vector<SipHash>& hashes,
                                   const uint8_t* __restrict null_data) const 
override;
 
-    void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType 
type,
+    void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType 
type,
                                 const uint8_t* __restrict null_data) const 
override;
 
+    void update_hashes_with_value(uint64_t* __restrict hashes,
+                                  const uint8_t* __restrict null_data) const 
override;
+
     ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const 
override;
     ColumnPtr replicate(const Offsets& offsets) const override;
     void replicate(const uint32_t* counts, size_t target_size, IColumn& 
column) const override;
diff --git a/be/src/vec/columns/column_decimal.cpp 
b/be/src/vec/columns/column_decimal.cpp
index 9b6470a371..690714b2ba 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -128,7 +128,7 @@ void 
ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
 }
 
 template <typename T>
-void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint32_t>& hashes, 
PrimitiveType type,
+void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, 
PrimitiveType type,
                                               const uint8_t* __restrict 
null_data) const {
     auto s = hashes.size();
     DCHECK(s == size());
@@ -160,6 +160,16 @@ void 
ColumnDecimal<T>::update_crcs_with_value(std::vector<uint32_t>& hashes, Pri
     }
 }
 
+template <typename T>
+void ColumnDecimal<T>::update_hashes_with_value(uint64_t* __restrict hashes,
+                                                const uint8_t* __restrict 
null_data) const {
+    auto s = size();
+    for (int i = 0; i < s; i++) {
+        hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast<const 
char*>(&data[i]), sizeof(T),
+                                               hashes[i]);
+    }
+}
+
 template <typename T>
 void ColumnDecimal<T>::get_permutation(bool reverse, size_t limit, int,
                                        IColumn::Permutation& res) const {
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index 41dba1827d..9957742c0d 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -154,9 +154,11 @@ public:
                                        const uint8_t* null_map) override;
 
     void update_hash_with_value(size_t n, SipHash& hash) const override;
-    void update_hashes_with_value(std::vector<SipHash>& hash,
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
                                   const uint8_t* __restrict null_data) const 
override;
-    void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType 
type,
+    void update_hashes_with_value(uint64_t* __restrict hashes,
+                                  const uint8_t* __restrict null_data) const 
override;
+    void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType 
type,
                                 const uint8_t* __restrict null_data) const 
override;
 
     int compare_at(size_t n, size_t m, const IColumn& rhs_, int 
nan_direction_hint) const override;
diff --git a/be/src/vec/columns/column_dummy.h 
b/be/src/vec/columns/column_dummy.h
index 96547bfd00..957b333189 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -72,11 +72,6 @@ public:
         return pos;
     }
 
-    void update_hash_with_value(size_t /*n*/, SipHash& /*hash*/) const 
override {}
-
-    void update_hashes_with_value(std::vector<SipHash>& hashes,
-                                  const uint8_t* __restrict null_data) const 
override {};
-
     void insert_from(const IColumn&, size_t) override { ++s; }
 
     void insert_range_from(const IColumn& /*src*/, size_t /*start*/, size_t 
length) override {
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index b6bf95f449..3d35f24e61 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -25,7 +25,6 @@
 #include "vec/common/arena.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/nan_utils.h"
-#include "vec/common/sip_hash.h"
 #include "vec/common/typeid_cast.h"
 #include "vec/core/sort_block.h"
 
@@ -73,7 +72,7 @@ void 
ColumnNullable::update_hashes_with_value(std::vector<SipHash>& hashes,
     }
 }
 
-void ColumnNullable::update_crcs_with_value(std::vector<uint32_t>& hashes,
+void ColumnNullable::update_crcs_with_value(std::vector<uint64_t>& hashes,
                                             doris::PrimitiveType type,
                                             const uint8_t* __restrict 
null_data) const {
     DCHECK(null_data == nullptr);
@@ -92,6 +91,21 @@ void 
ColumnNullable::update_crcs_with_value(std::vector<uint32_t>& hashes,
     }
 }
 
+void ColumnNullable::update_hashes_with_value(uint64_t* __restrict hashes,
+                                              const uint8_t* __restrict 
null_data) const {
+    DCHECK(null_data == nullptr);
+    auto s = size();
+    auto* __restrict real_null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data().data();
+    if (doris::simd::count_zero_num(reinterpret_cast<const 
int8_t*>(real_null_data), s) == s) {
+        nested_column->update_hashes_with_value(hashes, nullptr);
+    } else {
+        for (int i = 0; i < s; ++i) {
+            if (real_null_data[i] != 0) hashes[i] = 
HashUtil::xxHash64NullWithSeed(hashes[i]);
+        }
+        nested_column->update_hashes_with_value(hashes, real_null_data);
+    }
+}
+
 MutableColumnPtr ColumnNullable::clone_resized(size_t new_size) const {
     MutableColumnPtr new_nested_col = 
get_nested_column().clone_resized(new_size);
     auto new_null_map = ColumnUInt8::create();
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 723e88c8b6..2d3d36f7bc 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -163,8 +163,10 @@ public:
     void update_hash_with_value(size_t n, SipHash& hash) const override;
     void update_hashes_with_value(std::vector<SipHash>& hashes,
                                   const uint8_t* __restrict null_data) const 
override;
-    void update_crcs_with_value(std::vector<uint32_t>& hash, PrimitiveType 
type,
+    void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType 
type,
                                 const uint8_t* __restrict null_data) const 
override;
+    void update_hashes_with_value(uint64_t* __restrict hashes,
+                                  const uint8_t* __restrict null_data) const 
override;
     void get_extremes(Field& min, Field& max) const override;
 
     MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) 
const override {
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index 9d2d43b48a..87fa8da649 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -111,7 +111,7 @@ void ColumnString::insert_indices_from(const IColumn& src, 
const int* indices_be
     }
 }
 
-void ColumnString::update_crcs_with_value(std::vector<uint32_t>& hashes, 
doris::PrimitiveType type,
+void ColumnString::update_crcs_with_value(std::vector<uint64_t>& hashes, 
doris::PrimitiveType type,
                                           const uint8_t* __restrict null_data) 
const {
     auto s = hashes.size();
     DCHECK(s == size());
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 9cae0509cc..8dc597e18c 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -243,9 +243,20 @@ public:
         SIP_HASHES_FUNCTION_COLUMN_IMPL();
     }
 
-    void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType 
type,
+    void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType 
type,
                                 const uint8_t* __restrict null_data) const 
override;
 
+    void update_hashes_with_value(uint64_t* __restrict hashes,
+                                  const uint8_t* __restrict null_data) const 
override {
+        auto s = size();
+        for (int i = 0; i < s; i++) {
+            size_t string_size = size_at(i);
+            size_t offset = offset_at(i);
+            hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast<const 
char*>(&chars[offset]),
+                                                   string_size, hashes[i]);
+        }
+    }
+
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
 
     void insert_indices_from(const IColumn& src, const int* indices_begin,
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index 1678e5d7f4..e80b4009c6 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -111,6 +111,16 @@ void 
ColumnVector<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
     SIP_HASHES_FUNCTION_COLUMN_IMPL();
 }
 
+template <typename T>
+void ColumnVector<T>::update_hashes_with_value(uint64_t* __restrict hashes,
+                                               const uint8_t* __restrict 
null_data) const {
+    auto s = size();
+    for (int i = 0; i < s; i++) {
+        hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast<const 
char*>(&data[i]), sizeof(T),
+                                               hashes[i]);
+    }
+}
+
 template <typename T>
 void ColumnVector<T>::sort_column(const ColumnSorter* sorter, EqualFlags& 
flags,
                                   IColumn::Permutation& perms, EqualRange& 
range,
@@ -119,7 +129,7 @@ void ColumnVector<T>::sort_column(const ColumnSorter* 
sorter, EqualFlags& flags,
 }
 
 template <typename T>
-void ColumnVector<T>::update_crcs_with_value(std::vector<uint32_t>& hashes, 
PrimitiveType type,
+void ColumnVector<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, 
PrimitiveType type,
                                              const uint8_t* __restrict 
null_data) const {
     auto s = hashes.size();
     DCHECK(s == size());
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 447886d08f..020c771057 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -250,9 +250,12 @@ public:
     void update_hashes_with_value(std::vector<SipHash>& hashes,
                                   const uint8_t* __restrict null_data) const 
override;
 
-    void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType 
type,
+    void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType 
type,
                                 const uint8_t* __restrict null_data) const 
override;
 
+    void update_hashes_with_value(uint64_t* __restrict hashes,
+                                  const uint8_t* __restrict null_data) const 
override;
+
     size_t byte_size() const override { return data.size() * sizeof(data[0]); }
 
     size_t allocated_bytes() const override { return data.allocated_bytes(); }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index fc01d8a632..94a4e99163 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -410,6 +410,9 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
         shuffle(_channels.begin(), _channels.end(), g);
     } else if (_part_type == TPartitionType::HASH_PARTITIONED ||
                _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+        if (_state->query_options().__isset.enable_new_shuffle_hash_method) {
+            _new_shuffle_hash_method = 
_state->query_options().enable_new_shuffle_hash_method;
+        }
         RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, 
_row_desc));
     } else {
         RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, 
_row_desc));
@@ -495,7 +498,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
             current_channel->ch_roll_pb_block();
         }
         _current_channel_idx = (_current_channel_idx + 1) % _channels.size();
-    } else if (_part_type == TPartitionType::HASH_PARTITIONED) {
+    } else if (_part_type == TPartitionType::HASH_PARTITIONED ||
+               _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         // will only copy schema
         // we don't want send temp columns
         auto column_to_keep = block->columns();
@@ -506,45 +510,49 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block) {
 
         // vectorized calculate hash
         int rows = block->rows();
-        // for each row, we have a siphash val
-        std::vector<SipHash> siphashs(rows);
-        // result[j] means column index, i means rows index
-        for (int j = 0; j < result_size; ++j) {
-            
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
-        }
-
-        // channel2rows' subscript means channel id
-        std::vector<vectorized::UInt64> hash_vals(rows);
-        for (int i = 0; i < rows; i++) {
-            hash_vals[i] = siphashs[i].get64();
-        }
+        auto element_size = _channels.size();
+        std::vector<uint64_t> hash_vals(rows);
+        auto* __restrict hashes = hash_vals.data();
+
+        // TODO: after we support new shuffle hash method, should simple the 
code
+        if (_part_type == TPartitionType::HASH_PARTITIONED) {
+            if (!_new_shuffle_hash_method) {
+                // for each row, we have a siphash val
+                std::vector<SipHash> siphashs(rows);
+                // result[j] means column index, i means rows index
+                for (int j = 0; j < result_size; ++j) {
+                    
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
+                }
+                for (int i = 0; i < rows; i++) {
+                    hashes[i] = siphashs[i].get64() % element_size;
+                }
+            } else {
+                // result[j] means column index, i means rows index, here to 
calculate the xxhash value
+                for (int j = 0; j < result_size; ++j) {
+                    
block->get_by_position(result[j]).column->update_hashes_with_value(hashes);
+                }
 
-        Block::erase_useless_column(block, column_to_keep);
-        RETURN_IF_ERROR(channel_add_rows(_channels, _channels.size(), 
hash_vals, rows, block));
-    } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
-        // will only copy schema
-        // we don't want send temp columns
-        auto column_to_keep = block->columns();
-        // 1. calculate hash
-        // 2. dispatch rows to channel
-        int result_size = _partition_expr_ctxs.size();
-        int result[result_size];
-        RETURN_IF_ERROR(get_partition_column_result(block, result));
+                for (int i = 0; i < rows; i++) {
+                    hashes[i] = hashes[i] % element_size;
+                }
+            }
 
-        // vectorized calculate hash val
-        int rows = block->rows();
-        // for each row, we have a hash_val
-        std::vector<uint32_t> hash_vals(rows);
+            Block::erase_useless_column(block, column_to_keep);
+            RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes, 
rows, block));
+        } else {
+            for (int j = 0; j < result_size; ++j) {
+                
block->get_by_position(result[j]).column->update_crcs_with_value(
+                        hash_vals, 
_partition_expr_ctxs[j]->root()->type().type);
+            }
+            element_size = _channel_shared_ptrs.size();
+            for (int i = 0; i < rows; i++) {
+                hashes[i] = hashes[i] % element_size;
+            }
 
-        // result[j] means column index, i means rows index
-        for (int j = 0; j < result_size; ++j) {
-            block->get_by_position(result[j]).column->update_crcs_with_value(
-                    hash_vals, _partition_expr_ctxs[j]->root()->type().type);
+            Block::erase_useless_column(block, column_to_keep);
+            RETURN_IF_ERROR(
+                    channel_add_rows(_channel_shared_ptrs, element_size, 
hashes, rows, block));
         }
-
-        Block::erase_useless_column(block, column_to_keep);
-        RETURN_IF_ERROR(channel_add_rows(_channel_shared_ptrs, 
_channel_shared_ptrs.size(),
-                                         hash_vals, rows, block));
     } else {
         // Range partition
         // 1. calculate range
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index e537749dd7..0d44d3152b 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -88,9 +88,9 @@ protected:
         return Status::OK();
     }
 
-    template <typename Channels, typename HashVals>
-    Status channel_add_rows(Channels& channels, int num_channels, const 
HashVals& hash_vals,
-                            int rows, Block* block);
+    template <typename Channels>
+    Status channel_add_rows(Channels& channels, int num_channels, uint64_t* 
channel_ids, int rows,
+                            Block* block);
 
     struct hash_128 {
         uint64_t high;
@@ -152,6 +152,8 @@ protected:
     bool _transfer_large_data_by_brpc = false;
 
     segment_v2::CompressionTypePB _compression_type;
+
+    bool _new_shuffle_hash_method = false;
 };
 
 // TODO: support local exechange
@@ -311,14 +313,14 @@ private:
     bool _enable_local_exchange = false;
 };
 
-template <typename Channels, typename HashVals>
+template <typename Channels>
 Status VDataStreamSender::channel_add_rows(Channels& channels, int 
num_channels,
-                                           const HashVals& hash_vals, int 
rows, Block* block) {
+                                           uint64_t* __restrict channel_ids, 
int rows,
+                                           Block* block) {
     std::vector<int> channel2rows[num_channels];
 
     for (int i = 0; i < rows; i++) {
-        auto cid = hash_vals[i] % num_channels;
-        channel2rows[cid].emplace_back(i);
+        channel2rows[channel_ids[i]].emplace_back(i);
     }
 
     for (int i = 0; i < num_channels; ++i) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e27f5ff2f7..b8c32effb7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -217,6 +217,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String SKIP_DELETE_PREDICATE = "skip_delete_predicate";
 
+    public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD = 
"enable_new_shuffle_hash_method";
+
     // session origin value
     public Map<Field, String> sessionOriginValue = new HashMap<Field, 
String>();
     // check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -555,6 +557,8 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_FALLBACK_TO_ORIGINAL_PLANNER)
     public boolean enableFallbackToOriginalPlanner = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_NEW_SHUFFLE_HASH_METHOD)
+    public boolean enableNewShffleHashMethod = true;
 
     public String getBlockEncryptionMode() {
         return blockEncryptionMode;
@@ -1153,6 +1157,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableFunctionPushdown(enableFunctionPushdown);
         
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
         tResult.setEnableLocalExchange(enableLocalExchange);
+        tResult.setEnableNewShuffleHashMethod(enableNewShffleHashMethod);
 
         tResult.setSkipStorageEngineMerge(skipStorageEngineMerge);
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index d9911e6141..d25a8f04e9 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -175,6 +175,8 @@ struct TQueryOptions {
 
   // For debug purpose, skip delete predicates when reading data
   49: optional bool skip_delete_predicate = false
+
+  50: optional bool enable_new_shuffle_hash_method = true
 }
     
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to