This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 54d1630c42 [Opt](vectorized) speed up hash function compute in hash partition (#12334) 54d1630c42 is described below commit 54d1630c429ad8222f09a2b718eb29010317605c Author: HappenLee <happen...@hotmail.com> AuthorDate: Wed Sep 7 10:11:40 2022 +0800 [Opt](vectorized) speed up hash function compute in hash partition (#12334) After do the opt of hash function, the compute of siphash in HASH_PARTITION in vdata_stream_sender Before: 1s800ms After: 800ms --- be/src/vec/columns/column.h | 21 +++++++++++++++++++++ be/src/vec/columns/column_array.cpp | 6 +++++- be/src/vec/columns/column_array.h | 2 ++ be/src/vec/columns/column_complex.h | 6 ++++++ be/src/vec/columns/column_const.cpp | 17 +++++++++++++++++ be/src/vec/columns/column_const.h | 3 +++ be/src/vec/columns/column_decimal.cpp | 6 ++++++ be/src/vec/columns/column_decimal.h | 3 +++ be/src/vec/columns/column_dummy.h | 3 +++ be/src/vec/columns/column_nullable.cpp | 17 +++++++++++++++++ be/src/vec/columns/column_nullable.h | 2 ++ be/src/vec/columns/column_string.h | 6 ++++++ be/src/vec/columns/column_vector.cpp | 6 ++++++ be/src/vec/columns/column_vector.h | 3 +++ be/src/vec/exprs/vruntimefilter_wrapper.cpp | 1 - be/src/vec/sink/vdata_stream_sender.cpp | 5 +---- 16 files changed, 101 insertions(+), 6 deletions(-) diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 565a9416d7..fc3ab1478d 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -30,6 +30,19 @@ class SipHash; +#define SIP_HASHES_FUNCTION_COLUMN_IMPL() \ + auto s = hashes.size(); \ + DCHECK(s == size()); \ + if (null_data == nullptr) { \ + for (size_t i = 0; i < s; i++) { \ + update_hash_with_value(i, hashes[i]); \ + } \ + } else { \ + for (size_t i = 0; i < s; i++) { \ + if (null_data[i] == 0) update_hash_with_value(i, hashes[i]); \ + } \ + } + namespace doris::vectorized { class Arena; @@ -286,6 +299,14 @@ public: /// passed bytes to hash must identify sequence of values unambiguously. virtual void update_hash_with_value(size_t n, SipHash& hash) const = 0; + /// Update state of hash function with value of n elements to avoid the virtual function call + /// null_data to mark whether need to do hash compute, null_data == nullptr + /// means all element need to do hash function, else only *null_data != 0 need to do hash func + virtual void update_hashes_with_value(std::vector<SipHash>& hash, + const uint8_t* __restrict null_data = nullptr) const { + LOG(FATAL) << "update_hashes_with_value not supported"; + }; + /** Removes elements that don't match the filter. * Is used in WHERE and HAVING operations. * If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column; diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index c498b72345..db52f62c63 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -219,10 +219,14 @@ void ColumnArray::update_hash_with_value(size_t n, SipHash& hash) const { size_t array_size = size_at(n); size_t offset = offset_at(n); - hash.update(array_size); for (size_t i = 0; i < array_size; ++i) get_data().update_hash_with_value(offset + i, hash); } +void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const { + SIP_HASHES_FUNCTION_COLUMN_IMPL(); +} + void ColumnArray::insert(const Field& x) { const Array& array = doris::vectorized::get<const Array&>(x); size_t size = array.size(); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 686089f4e9..820064505d 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -83,6 +83,8 @@ public: StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; const char* deserialize_and_insert_from_arena(const char* pos) override; void update_hash_with_value(size_t n, SipHash& hash) const override; + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override; void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert(const Field& x) override; void insert_from(const IColumn& src_, size_t n) override; diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index 81e826d5b9..d7e99406e5 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -183,10 +183,16 @@ public: LOG(FATAL) << "deserialize_and_insert_from_arena not implemented"; } + // maybe we do not need to impl the function void update_hash_with_value(size_t n, SipHash& hash) const override { // TODO add hash function } + void update_hashes_with_value(std::vector<SipHash>& hash, + const uint8_t* __restrict null_data) const override { + // TODO add hash function + } + [[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs, int nan_direction_hint) const override { LOG(FATAL) << "compare_at not implemented"; diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index efb790249a..946074cf61 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -22,6 +22,7 @@ #include "vec/columns/columns_common.h" #include "vec/common/pod_array.h" +#include "vec/common/sip_hash.h" #include "vec/common/typeid_cast.h" namespace doris::vectorized { @@ -86,6 +87,22 @@ ColumnPtr ColumnConst::permute(const Permutation& perm, size_t limit) const { return ColumnConst::create(data, limit); } +void ColumnConst::update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + DCHECK(hashes.size() == size()); + auto real_data = data->get_data_at(0); + if (real_data.data == nullptr) { + for (auto& hash : hashes) { + hash.update(0); + } + } else { + for (auto& hash : hashes) { + hash.update(real_data.data, real_data.size); + } + } +} + MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector& selector) const { if (s != selector.size()) { LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of column ({})", diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 615c936a5e..be7f56ab23 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -132,6 +132,9 @@ public: data->update_hash_with_value(0, hash); } + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override; + ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; ColumnPtr replicate(const Offsets& offsets) const override; void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 1e34056286..91145e4473 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -120,6 +120,12 @@ void ColumnDecimal<T>::update_hash_with_value(size_t n, SipHash& hash) const { hash.update(data[n]); } +template <typename T> +void ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const { + SIP_HASHES_FUNCTION_COLUMN_IMPL(); +} + template <typename T> void ColumnDecimal<T>::get_permutation(bool reverse, size_t limit, int, IColumn::Permutation& res) const { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index a43227bd41..2b3dac0ae5 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -154,6 +154,9 @@ public: const uint8_t* null_map) override; void update_hash_with_value(size_t n, SipHash& hash) const override; + void update_hashes_with_value(std::vector<SipHash>& hash, + const uint8_t* __restrict null_data) const override; + int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation& res) const override; diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h index e7fb657161..96547bfd00 100644 --- a/be/src/vec/columns/column_dummy.h +++ b/be/src/vec/columns/column_dummy.h @@ -74,6 +74,9 @@ public: void update_hash_with_value(size_t /*n*/, SipHash& /*hash*/) const override {} + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override {}; + void insert_from(const IColumn&, size_t) override { ++s; } void insert_range_from(const IColumn& /*src*/, size_t /*start*/, size_t length) override { diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 14eebe3150..cff4d0f7c3 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -20,6 +20,7 @@ #include "vec/columns/column_nullable.h" +#include "util/simd/bits.h" #include "vec/columns/column_const.h" #include "vec/common/arena.h" #include "vec/common/assert_cast.h" @@ -50,6 +51,22 @@ void ColumnNullable::update_hash_with_value(size_t n, SipHash& hash) const { get_nested_column().update_hash_with_value(n, hash); } +void ColumnNullable::update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + auto s = hashes.size(); + DCHECK(s == size()); + auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data(); + if (doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(real_null_data), s) == s) { + nested_column->update_hashes_with_value(hashes, nullptr); + } else { + for (int i = 0; i < s; ++i) { + if (real_null_data[i] != 0) hashes[i].update(0); + } + nested_column->update_hashes_with_value(hashes, real_null_data); + } +} + MutableColumnPtr ColumnNullable::clone_resized(size_t new_size) const { MutableColumnPtr new_nested_col = get_nested_column().clone_resized(new_size); auto new_null_map = ColumnUInt8::create(); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 05807e10ec..397f3be1bd 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -159,6 +159,8 @@ public: ColumnPtr replicate(const Offsets& replicate_offsets) const override; void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; void update_hash_with_value(size_t n, SipHash& hash) const override; + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override; void get_extremes(Field& min, Field& max) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index d8a704e362..d2744e20b7 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -246,10 +246,16 @@ public: size_t string_size = size_at(n); size_t offset = offset_at(n); + // TODO: Rethink we really need to update the string_size? hash.update(reinterpret_cast<const char*>(&string_size), sizeof(string_size)); hash.update(reinterpret_cast<const char*>(&chars[offset]), string_size); } + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override { + SIP_HASHES_FUNCTION_COLUMN_IMPL(); + } + void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert_indices_from(const IColumn& src, const int* indices_begin, diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 279a777992..13c5810026 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -105,6 +105,12 @@ void ColumnVector<T>::update_hash_with_value(size_t n, SipHash& hash) const { hash.update(data[n]); } +template <typename T> +void ColumnVector<T>::update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const { + SIP_HASHES_FUNCTION_COLUMN_IMPL(); +} + template <typename T> struct ColumnVector<T>::less { const Self& parent; diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index ad39b814f2..5101894b92 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -247,6 +247,9 @@ public: void update_hash_with_value(size_t n, SipHash& hash) const override; + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override; + size_t byte_size() const override { return data.size() * sizeof(data[0]); } size_t allocated_bytes() const override { return data.allocated_bytes(); } diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index 869de36a26..bbd7cf576c 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -21,7 +21,6 @@ #include "util/simd/bits.h" #include "vec/columns/column_nullable.h" -#include "vec/columns/column_set.h" #include "vec/core/field.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/functions/simple_function_factory.h" diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 34b98a9967..e22e9277af 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -528,10 +528,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { std::vector<SipHash> siphashs(rows); // result[j] means column index, i means rows index for (int j = 0; j < result_size; ++j) { - auto column = block->get_by_position(result[j]).column; - for (int i = 0; i < rows; ++i) { - column->update_hash_with_value(i, siphashs[i]); - } + block->get_by_position(result[j]).column->update_hashes_with_value(siphashs); } // channel2rows' subscript means channel id --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org