This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e413a2b8e9 [Opt](vectorized) Use new way to do hash shffle to speed up query (#12586) e413a2b8e9 is described below commit e413a2b8e914ed13cf1fd6c44e7c6b3cce9c2dd8 Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Sep 15 11:08:04 2022 +0800 [Opt](vectorized) Use new way to do hash shffle to speed up query (#12586) --- be/src/util/hash_util.hpp | 13 ++++ be/src/vec/columns/column.h | 21 ++++-- be/src/vec/columns/column_array.cpp | 5 -- be/src/vec/columns/column_array.h | 2 - be/src/vec/columns/column_complex.h | 5 -- be/src/vec/columns/column_const.cpp | 18 ++++- be/src/vec/columns/column_const.h | 5 +- be/src/vec/columns/column_decimal.cpp | 12 +++- be/src/vec/columns/column_decimal.h | 6 +- be/src/vec/columns/column_dummy.h | 5 -- be/src/vec/columns/column_nullable.cpp | 18 ++++- be/src/vec/columns/column_nullable.h | 4 +- be/src/vec/columns/column_string.cpp | 2 +- be/src/vec/columns/column_string.h | 13 +++- be/src/vec/columns/column_vector.cpp | 12 +++- be/src/vec/columns/column_vector.h | 5 +- be/src/vec/sink/vdata_stream_sender.cpp | 80 ++++++++++++---------- be/src/vec/sink/vdata_stream_sender.h | 16 +++-- .../java/org/apache/doris/qe/SessionVariable.java | 5 ++ gensrc/thrift/PaloInternalService.thrift | 2 + 20 files changed, 173 insertions(+), 76 deletions(-) diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp index 0ebcc6e11b..4ee40a06bc 100644 --- a/be/src/util/hash_util.hpp +++ b/be/src/util/hash_util.hpp @@ -33,6 +33,7 @@ #elif __aarch64__ #include <sse2neon.h> #endif +#include <xxh3.h> #include <zlib.h> #include "gen_cpp/Types_types.h" @@ -363,6 +364,18 @@ public: std::hash<T> hasher; seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2); } + + // xxHash function for a byte array. For convenience, a 64-bit seed is also + // hashed into the result. The mapping may change from time to time. + static xxh_u64 xxHash64WithSeed(const char* s, size_t len, xxh_u64 seed) { + return XXH3_64bits_withSeed(s, len, seed); + } + + // same to the up function, just for null value + static xxh_u64 xxHash64NullWithSeed(xxh_u64 seed) { + static const int INT_VALUE = 0; + return XXH3_64bits_withSeed(reinterpret_cast<const char*>(&INT_VALUE), sizeof(int), seed); + } }; } // namespace doris diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 9334c97476..f58cfa8c0e 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -323,23 +323,36 @@ public: LOG(FATAL) << "deserialize_vec_with_null_map not supported"; } + /// TODO: SipHash is slower than city or xx hash, rethink we should have a new interface /// Update state of hash function with value of n-th element. /// On subsequent calls of this method for sequence of column values of arbitrary types, /// passed bytes to hash must identify sequence of values unambiguously. - virtual void update_hash_with_value(size_t n, SipHash& hash) const = 0; + virtual void update_hash_with_value(size_t n, SipHash& hash) const { + LOG(FATAL) << "update_hash_with_value siphash not supported"; + } + + /// Update state of hash function with value of n elements to avoid the virtual function call + /// null_data to mark whether need to do hash compute, null_data == nullptr + /// means all element need to do hash function, else only *null_data != 0 need to do hash func + /// do xxHash here, faster than other hash method + virtual void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data = nullptr) const { + LOG(FATAL) << "update_hashes_with_value siphash not supported"; + }; /// Update state of hash function with value of n elements to avoid the virtual function call /// null_data to mark whether need to do hash compute, null_data == nullptr /// means all element need to do hash function, else only *null_data != 0 need to do hash func - virtual void update_hashes_with_value(std::vector<SipHash>& hash, + /// do xxHash here, faster than other sip hash + virtual void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data = nullptr) const { - LOG(FATAL) << "update_hashes_with_value not supported"; + LOG(FATAL) << "update_hashes_with_value xxhash not supported"; }; /// Update state of crc32 hash function with value of n elements to avoid the virtual function call /// null_data to mark whether need to do hash compute, null_data == nullptr /// means all element need to do hash function, else only *null_data != 0 need to do hash func - virtual void update_crcs_with_value(std::vector<uint32_t>& hash, PrimitiveType type, + virtual void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, const uint8_t* __restrict null_data = nullptr) const { LOG(FATAL) << "update_crcs_with_value not supported"; }; diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index f9b1c53467..510b1fb681 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -228,11 +228,6 @@ void ColumnArray::update_hash_with_value(size_t n, SipHash& hash) const { for (size_t i = 0; i < array_size; ++i) get_data().update_hash_with_value(offset + i, hash); } -void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes, - const uint8_t* __restrict null_data) const { - SIP_HASHES_FUNCTION_COLUMN_IMPL(); -} - void ColumnArray::insert(const Field& x) { const Array& array = doris::vectorized::get<const Array&>(x); size_t size = array.size(); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 2f8df6d83d..4d8e8f4d3e 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -98,8 +98,6 @@ public: StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; const char* deserialize_and_insert_from_arena(const char* pos) override; void update_hash_with_value(size_t n, SipHash& hash) const override; - void update_hashes_with_value(std::vector<SipHash>& hashes, - const uint8_t* __restrict null_data) const override; void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert(const Field& x) override; void insert_from(const IColumn& src_, size_t n) override; diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index d7e99406e5..2a2c21f19b 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -188,11 +188,6 @@ public: // TODO add hash function } - void update_hashes_with_value(std::vector<SipHash>& hash, - const uint8_t* __restrict null_data) const override { - // TODO add hash function - } - [[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs, int nan_direction_hint) const override { LOG(FATAL) << "compare_at not implemented"; diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index 4bd72ccf8d..dc7142693e 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -104,7 +104,7 @@ void ColumnConst::update_hashes_with_value(std::vector<SipHash>& hashes, } } -void ColumnConst::update_crcs_with_value(std::vector<uint32_t>& hashes, doris::PrimitiveType type, +void ColumnConst::update_crcs_with_value(std::vector<uint64_t>& hashes, doris::PrimitiveType type, const uint8_t* __restrict null_data) const { DCHECK(null_data == nullptr); DCHECK(hashes.size() == size()); @@ -121,6 +121,22 @@ void ColumnConst::update_crcs_with_value(std::vector<uint32_t>& hashes, doris::P } } +void ColumnConst::update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + auto real_data = data->get_data_at(0); + auto real_size = size(); + if (real_data.data == nullptr) { + for (int i = 0; i < real_size; ++i) { + hashes[i] = HashUtil::xxHash64NullWithSeed(hashes[i]); + } + } else { + for (int i = 0; i < real_size; ++i) { + hashes[i] = HashUtil::xxHash64WithSeed(real_data.data, real_data.size, hashes[i]); + } + } +} + MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector& selector) const { if (s != selector.size()) { LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of column ({})", diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 3f4e735780..f001150bee 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -130,9 +130,12 @@ public: void update_hashes_with_value(std::vector<SipHash>& hashes, const uint8_t* __restrict null_data) const override; - void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType type, + void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const override; + ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; ColumnPtr replicate(const Offsets& offsets) const override; void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 9b6470a371..690714b2ba 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -128,7 +128,7 @@ void ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes, } template <typename T> -void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType type, +void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const { auto s = hashes.size(); DCHECK(s == size()); @@ -160,6 +160,16 @@ void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint32_t>& hashes, Pri } } +template <typename T> +void ColumnDecimal<T>::update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const { + auto s = size(); + for (int i = 0; i < s; i++) { + hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[i]), sizeof(T), + hashes[i]); + } +} + template <typename T> void ColumnDecimal<T>::get_permutation(bool reverse, size_t limit, int, IColumn::Permutation& res) const { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 41dba1827d..9957742c0d 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -154,9 +154,11 @@ public: const uint8_t* null_map) override; void update_hash_with_value(size_t n, SipHash& hash) const override; - void update_hashes_with_value(std::vector<SipHash>& hash, + void update_hashes_with_value(std::vector<SipHash>& hashes, const uint8_t* __restrict null_data) const override; - void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType type, + void update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const override; + void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const override; int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override; diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h index 96547bfd00..957b333189 100644 --- a/be/src/vec/columns/column_dummy.h +++ b/be/src/vec/columns/column_dummy.h @@ -72,11 +72,6 @@ public: return pos; } - void update_hash_with_value(size_t /*n*/, SipHash& /*hash*/) const override {} - - void update_hashes_with_value(std::vector<SipHash>& hashes, - const uint8_t* __restrict null_data) const override {}; - void insert_from(const IColumn&, size_t) override { ++s; } void insert_range_from(const IColumn& /*src*/, size_t /*start*/, size_t length) override { diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index b6bf95f449..3d35f24e61 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -25,7 +25,6 @@ #include "vec/common/arena.h" #include "vec/common/assert_cast.h" #include "vec/common/nan_utils.h" -#include "vec/common/sip_hash.h" #include "vec/common/typeid_cast.h" #include "vec/core/sort_block.h" @@ -73,7 +72,7 @@ void ColumnNullable::update_hashes_with_value(std::vector<SipHash>& hashes, } } -void ColumnNullable::update_crcs_with_value(std::vector<uint32_t>& hashes, +void ColumnNullable::update_crcs_with_value(std::vector<uint64_t>& hashes, doris::PrimitiveType type, const uint8_t* __restrict null_data) const { DCHECK(null_data == nullptr); @@ -92,6 +91,21 @@ void ColumnNullable::update_crcs_with_value(std::vector<uint32_t>& hashes, } } +void ColumnNullable::update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + auto s = size(); + auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data(); + if (doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(real_null_data), s) == s) { + nested_column->update_hashes_with_value(hashes, nullptr); + } else { + for (int i = 0; i < s; ++i) { + if (real_null_data[i] != 0) hashes[i] = HashUtil::xxHash64NullWithSeed(hashes[i]); + } + nested_column->update_hashes_with_value(hashes, real_null_data); + } +} + MutableColumnPtr ColumnNullable::clone_resized(size_t new_size) const { MutableColumnPtr new_nested_col = get_nested_column().clone_resized(new_size); auto new_null_map = ColumnUInt8::create(); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 723e88c8b6..2d3d36f7bc 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -163,8 +163,10 @@ public: void update_hash_with_value(size_t n, SipHash& hash) const override; void update_hashes_with_value(std::vector<SipHash>& hashes, const uint8_t* __restrict null_data) const override; - void update_crcs_with_value(std::vector<uint32_t>& hash, PrimitiveType type, + void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const override; void get_extremes(Field& min, Field& max) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 9d2d43b48a..87fa8da649 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -111,7 +111,7 @@ void ColumnString::insert_indices_from(const IColumn& src, const int* indices_be } } -void ColumnString::update_crcs_with_value(std::vector<uint32_t>& hashes, doris::PrimitiveType type, +void ColumnString::update_crcs_with_value(std::vector<uint64_t>& hashes, doris::PrimitiveType type, const uint8_t* __restrict null_data) const { auto s = hashes.size(); DCHECK(s == size()); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 9cae0509cc..8dc597e18c 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -243,9 +243,20 @@ public: SIP_HASHES_FUNCTION_COLUMN_IMPL(); } - void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType type, + void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const override { + auto s = size(); + for (int i = 0; i < s; i++) { + size_t string_size = size_at(i); + size_t offset = offset_at(i); + hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&chars[offset]), + string_size, hashes[i]); + } + } + void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert_indices_from(const IColumn& src, const int* indices_begin, diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 1678e5d7f4..e80b4009c6 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -111,6 +111,16 @@ void ColumnVector<T>::update_hashes_with_value(std::vector<SipHash>& hashes, SIP_HASHES_FUNCTION_COLUMN_IMPL(); } +template <typename T> +void ColumnVector<T>::update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const { + auto s = size(); + for (int i = 0; i < s; i++) { + hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[i]), sizeof(T), + hashes[i]); + } +} + template <typename T> void ColumnVector<T>::sort_column(const ColumnSorter* sorter, EqualFlags& flags, IColumn::Permutation& perms, EqualRange& range, @@ -119,7 +129,7 @@ void ColumnVector<T>::sort_column(const ColumnSorter* sorter, EqualFlags& flags, } template <typename T> -void ColumnVector<T>::update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType type, +void ColumnVector<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const { auto s = hashes.size(); DCHECK(s == size()); diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 447886d08f..020c771057 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -250,9 +250,12 @@ public: void update_hashes_with_value(std::vector<SipHash>& hashes, const uint8_t* __restrict null_data) const override; - void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType type, + void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const override; + size_t byte_size() const override { return data.size() * sizeof(data[0]); } size_t allocated_bytes() const override { return data.allocated_bytes(); } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index fc01d8a632..94a4e99163 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -410,6 +410,9 @@ Status VDataStreamSender::prepare(RuntimeState* state) { shuffle(_channels.begin(), _channels.end(), g); } else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + if (_state->query_options().__isset.enable_new_shuffle_hash_method) { + _new_shuffle_hash_method = _state->query_options().enable_new_shuffle_hash_method; + } RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, _row_desc)); } else { RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, _row_desc)); @@ -495,7 +498,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { current_channel->ch_roll_pb_block(); } _current_channel_idx = (_current_channel_idx + 1) % _channels.size(); - } else if (_part_type == TPartitionType::HASH_PARTITIONED) { + } else if (_part_type == TPartitionType::HASH_PARTITIONED || + _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { // will only copy schema // we don't want send temp columns auto column_to_keep = block->columns(); @@ -506,45 +510,49 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { // vectorized calculate hash int rows = block->rows(); - // for each row, we have a siphash val - std::vector<SipHash> siphashs(rows); - // result[j] means column index, i means rows index - for (int j = 0; j < result_size; ++j) { - block->get_by_position(result[j]).column->update_hashes_with_value(siphashs); - } - - // channel2rows' subscript means channel id - std::vector<vectorized::UInt64> hash_vals(rows); - for (int i = 0; i < rows; i++) { - hash_vals[i] = siphashs[i].get64(); - } + auto element_size = _channels.size(); + std::vector<uint64_t> hash_vals(rows); + auto* __restrict hashes = hash_vals.data(); + + // TODO: after we support new shuffle hash method, should simple the code + if (_part_type == TPartitionType::HASH_PARTITIONED) { + if (!_new_shuffle_hash_method) { + // for each row, we have a siphash val + std::vector<SipHash> siphashs(rows); + // result[j] means column index, i means rows index + for (int j = 0; j < result_size; ++j) { + block->get_by_position(result[j]).column->update_hashes_with_value(siphashs); + } + for (int i = 0; i < rows; i++) { + hashes[i] = siphashs[i].get64() % element_size; + } + } else { + // result[j] means column index, i means rows index, here to calculate the xxhash value + for (int j = 0; j < result_size; ++j) { + block->get_by_position(result[j]).column->update_hashes_with_value(hashes); + } - Block::erase_useless_column(block, column_to_keep); - RETURN_IF_ERROR(channel_add_rows(_channels, _channels.size(), hash_vals, rows, block)); - } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { - // will only copy schema - // we don't want send temp columns - auto column_to_keep = block->columns(); - // 1. calculate hash - // 2. dispatch rows to channel - int result_size = _partition_expr_ctxs.size(); - int result[result_size]; - RETURN_IF_ERROR(get_partition_column_result(block, result)); + for (int i = 0; i < rows; i++) { + hashes[i] = hashes[i] % element_size; + } + } - // vectorized calculate hash val - int rows = block->rows(); - // for each row, we have a hash_val - std::vector<uint32_t> hash_vals(rows); + Block::erase_useless_column(block, column_to_keep); + RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes, rows, block)); + } else { + for (int j = 0; j < result_size; ++j) { + block->get_by_position(result[j]).column->update_crcs_with_value( + hash_vals, _partition_expr_ctxs[j]->root()->type().type); + } + element_size = _channel_shared_ptrs.size(); + for (int i = 0; i < rows; i++) { + hashes[i] = hashes[i] % element_size; + } - // result[j] means column index, i means rows index - for (int j = 0; j < result_size; ++j) { - block->get_by_position(result[j]).column->update_crcs_with_value( - hash_vals, _partition_expr_ctxs[j]->root()->type().type); + Block::erase_useless_column(block, column_to_keep); + RETURN_IF_ERROR( + channel_add_rows(_channel_shared_ptrs, element_size, hashes, rows, block)); } - - Block::erase_useless_column(block, column_to_keep); - RETURN_IF_ERROR(channel_add_rows(_channel_shared_ptrs, _channel_shared_ptrs.size(), - hash_vals, rows, block)); } else { // Range partition // 1. calculate range diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index e537749dd7..0d44d3152b 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -88,9 +88,9 @@ protected: return Status::OK(); } - template <typename Channels, typename HashVals> - Status channel_add_rows(Channels& channels, int num_channels, const HashVals& hash_vals, - int rows, Block* block); + template <typename Channels> + Status channel_add_rows(Channels& channels, int num_channels, uint64_t* channel_ids, int rows, + Block* block); struct hash_128 { uint64_t high; @@ -152,6 +152,8 @@ protected: bool _transfer_large_data_by_brpc = false; segment_v2::CompressionTypePB _compression_type; + + bool _new_shuffle_hash_method = false; }; // TODO: support local exechange @@ -311,14 +313,14 @@ private: bool _enable_local_exchange = false; }; -template <typename Channels, typename HashVals> +template <typename Channels> Status VDataStreamSender::channel_add_rows(Channels& channels, int num_channels, - const HashVals& hash_vals, int rows, Block* block) { + uint64_t* __restrict channel_ids, int rows, + Block* block) { std::vector<int> channel2rows[num_channels]; for (int i = 0; i < rows; i++) { - auto cid = hash_vals[i] % num_channels; - channel2rows[cid].emplace_back(i); + channel2rows[channel_ids[i]].emplace_back(i); } for (int i = 0; i < num_channels; ++i) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index e27f5ff2f7..b8c32effb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -217,6 +217,8 @@ public class SessionVariable implements Serializable, Writable { public static final String SKIP_DELETE_PREDICATE = "skip_delete_predicate"; + public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD = "enable_new_shuffle_hash_method"; + // session origin value public Map<Field, String> sessionOriginValue = new HashMap<Field, String>(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -555,6 +557,8 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_FALLBACK_TO_ORIGINAL_PLANNER) public boolean enableFallbackToOriginalPlanner = true; + @VariableMgr.VarAttr(name = ENABLE_NEW_SHUFFLE_HASH_METHOD) + public boolean enableNewShffleHashMethod = true; public String getBlockEncryptionMode() { return blockEncryptionMode; @@ -1153,6 +1157,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableFunctionPushdown(enableFunctionPushdown); tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec); tResult.setEnableLocalExchange(enableLocalExchange); + tResult.setEnableNewShuffleHashMethod(enableNewShffleHashMethod); tResult.setSkipStorageEngineMerge(skipStorageEngineMerge); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index d9911e6141..d25a8f04e9 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -175,6 +175,8 @@ struct TQueryOptions { // For debug purpose, skip delete predicates when reading data 49: optional bool skip_delete_predicate = false + + 50: optional bool enable_new_shuffle_hash_method = true } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org