This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b7d6a70868 [FIX](datatype) Implement hash func with array/map/struct type (#21334) b7d6a70868 is described below commit b7d6a7086819c0e6f8198c68cd8e5e1fd84e64b1 Author: amory <wangqian...@selectdb.com> AuthorDate: Fri Jun 30 17:11:35 2023 +0800 [FIX](datatype) Implement hash func with array/map/struct type (#21334) we do not Implement any hash functions in array/map/struct column , so we use sql like this will make be core select * from ( select bdp.nc_num, collect_list(distinct(bd.catalog_name)) as catalog_name, material_qty from dataease.bu_delivery_product bdp left join dataease.bu_trans_transfer btt on bdp.delivery_product_id = btt.delivery_product_id left join dataease.bu_delivery bd on bdp.delivery_id = bd.delivery_id where bd.val_status in ('10', '20', '30', '90') and bd.delivery_type in (0, 1, 2) group by nc_num, material_qty union ALL select bdp.nc_num, collect_list(distinct(bd.catalog_name)) as catalog_name, material_qty from dataease.bu_trans_transfer btt left join dataease.bu_delivery_product bdp on bdp.delivery_product_id = btt.delivery_product_id left join dataease.bu_delivery bd on bdp.delivery_id = bd.delivery_id where bd.val_status in ('10', '20', '30', '90') and bd.delivery_type in (0, 1, 2) group by nc_num, material_qty ) aa; core : --- be/src/vec/columns/column.h | 16 ++- be/src/vec/columns/column_array.cpp | 63 +++++++++ be/src/vec/columns/column_array.h | 14 ++ be/src/vec/columns/column_const.h | 14 ++ be/src/vec/columns/column_decimal.cpp | 31 +++-- be/src/vec/columns/column_decimal.h | 3 + be/src/vec/columns/column_map.cpp | 68 ++++++++- be/src/vec/columns/column_map.h | 13 ++ be/src/vec/columns/column_nullable.cpp | 18 +++ be/src/vec/columns/column_nullable.h | 9 +- be/src/vec/columns/column_string.h | 12 ++ be/src/vec/columns/column_struct.cpp | 31 +++++ be/src/vec/columns/column_struct.h | 13 ++ be/src/vec/columns/column_vector.h | 18 +++ be/src/vec/sink/vdata_stream_sender.cpp | 15 +- be/test/vec/columns/column_hash_func_test.cpp | 190 ++++++++++++++++++++++++++ 16 files changed, 502 insertions(+), 26 deletions(-) diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 00f1824b99..b291ba2443 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -365,7 +365,7 @@ public: /// On subsequent calls of this method for sequence of column values of arbitrary types, /// passed bytes to hash must identify sequence of values unambiguously. virtual void update_hash_with_value(size_t n, SipHash& hash) const { - LOG(FATAL) << "update_hash_with_value siphash not supported"; + LOG(FATAL) << get_name() << " update_hash_with_value siphash not supported"; } /// Update state of hash function with value of n elements to avoid the virtual function call @@ -374,7 +374,7 @@ public: /// do xxHash here, faster than other hash method virtual void update_hashes_with_value(std::vector<SipHash>& hashes, const uint8_t* __restrict null_data = nullptr) const { - LOG(FATAL) << "update_hashes_with_value siphash not supported"; + LOG(FATAL) << get_name() << " update_hashes_with_value siphash not supported"; } /// Update state of hash function with value of n elements to avoid the virtual function call @@ -383,7 +383,11 @@ public: /// do xxHash here, faster than other sip hash virtual void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data = nullptr) const { - LOG(FATAL) << "update_hashes_with_value xxhash not supported"; + LOG(FATAL) << get_name() << " update_hashes_with_value xxhash not supported"; + } + + virtual void update_xxHash_with_value(size_t n, uint64_t& hash) const { + LOG(FATAL) << get_name() << " update_hash_with_value xxhash not supported"; } /// Update state of crc32 hash function with value of n elements to avoid the virtual function call @@ -391,7 +395,11 @@ public: /// means all element need to do hash function, else only *null_data != 0 need to do hash func virtual void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, const uint8_t* __restrict null_data = nullptr) const { - LOG(FATAL) << "update_crcs_with_value not supported"; + LOG(FATAL) << get_name() << "update_crcs_with_value not supported"; + } + + virtual void update_crc_with_value(size_t n, uint64_t& hash) const { + LOG(FATAL) << get_name() << " update_crc_with_value not supported"; } /** Removes elements that don't match the filter. diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 4215bd36bd..c5e35e5bb9 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -271,6 +271,69 @@ void ColumnArray::update_hash_with_value(size_t n, SipHash& hash) const { for (size_t i = 0; i < array_size; ++i) get_data().update_hash_with_value(offset + i, hash); } +void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const { + SIP_HASHES_FUNCTION_COLUMN_IMPL(); +} + +// for every array row calculate xxHash +void ColumnArray::update_xxHash_with_value(size_t n, uint64_t& hash) const { + size_t elem_size = size_at(n); + size_t offset = offset_at(n); + hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&elem_size), sizeof(elem_size), + hash); + for (auto i = 0; i < elem_size; ++i) { + get_data().update_xxHash_with_value(offset + i, hash); + } +} + +// for every array row calculate crcHash +void ColumnArray::update_crc_with_value(size_t n, uint64_t& crc) const { + size_t elem_size = size_at(n); + size_t offset = offset_at(n); + + crc = HashUtil::zlib_crc_hash(reinterpret_cast<const char*>(&elem_size), sizeof(elem_size), + crc); + for (auto i = 0; i < elem_size; ++i) { + get_data().update_crc_with_value(offset + i, crc); + } +} + +void ColumnArray::update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const { + auto s = size(); + if (null_data) { + for (size_t i = 0; i < s; ++i) { + if (null_data[i] == 0) { + update_xxHash_with_value(i, hashes[i]); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_xxHash_with_value(i, hashes[i]); + } + } +} + +void ColumnArray::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, + const uint8_t* __restrict null_data) const { + auto s = hash.size(); + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_crc_with_value(i, hash[i]); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_crc_with_value(i, hash[i]); + } + } +} + void ColumnArray::insert(const Field& x) { const Array& array = doris::vectorized::get<const Array&>(x); size_t size = array.size(); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 4f08c269fb..2e1c96a2c5 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -139,6 +139,18 @@ public: StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; const char* deserialize_and_insert_from_arena(const char* pos) override; void update_hash_with_value(size_t n, SipHash& hash) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override; + void update_crc_with_value(size_t n, uint64_t& crc) const override; + + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override; + + void update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data = nullptr) const override; + + void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, + const uint8_t* __restrict null_data = nullptr) const override; + void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert(const Field& x) override; void insert_from(const IColumn& src_, size_t n) override; @@ -240,6 +252,8 @@ public: ColumnPtr index(const IColumn& indexes, size_t limit) const override; private: + // [[2,1,5,9,1], [1,2,4]] --> data column [2,1,5,9,1,1,2,4], offset[-1] = 0, offset[0] = 5, offset[1] = 8 + // [[[2,1,5],[9,1]], [[1,2]]] --> data column [3 column array], offset[-1] = 0, offset[0] = 2, offset[1] = 3 WrappedPtr data; WrappedPtr offsets; diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index bb17f7eb04..feeb0608a2 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -152,6 +152,19 @@ public: data->serialize_vec(keys, num_rows, max_row_byte_size); } + void update_xxHash_with_value(size_t n, uint64_t& hash) const override { + auto real_data = data->get_data_at(0); + if (real_data.data == nullptr) { + hash = HashUtil::xxHash64NullWithSeed(hash); + } else { + hash = HashUtil::xxHash64WithSeed(real_data.data, real_data.size, hash); + } + } + + void update_crc_with_value(size_t n, uint64_t& crc) const override { + get_data_column_ptr()->update_crc_with_value(n, crc); + } + void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows, const uint8_t* null_map, size_t max_row_byte_size) const override { @@ -165,6 +178,7 @@ public: void update_hashes_with_value(std::vector<SipHash>& hashes, const uint8_t* __restrict null_data) const override; + // (TODO.Amory) here may not use column_const update hash, and PrimitiveType is not used. void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const override; diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 069f195c4a..e0b8fef056 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -137,6 +137,19 @@ void ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes, SIP_HASHES_FUNCTION_COLUMN_IMPL(); } +template <typename T> +void ColumnDecimal<T>::update_crc_with_value(size_t n, uint64_t& crc) const { + if constexpr (!IsDecimalV2<T>) { + crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc); + } else { + const DecimalV2Value& dec_val = (const DecimalV2Value&)data[n]; + int64_t int_val = dec_val.int_value(); + int32_t frac_val = dec_val.frac_value(); + crc = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), crc); + crc = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), crc); + }; +} + template <typename T> void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const { @@ -146,27 +159,23 @@ void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, Pri if constexpr (!IsDecimalV2<T>) { DO_CRC_HASHES_FUNCTION_COLUMN_IMPL() } else { - DCHECK(type == TYPE_DECIMALV2); - auto decimalv2_do_crc = [&](size_t i) { - const DecimalV2Value& dec_val = (const DecimalV2Value&)data[i]; - int64_t int_val = dec_val.int_value(); - int32_t frac_val = dec_val.frac_value(); - hashes[i] = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), hashes[i]); - hashes[i] = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), hashes[i]); - }; - if (null_data == nullptr) { for (size_t i = 0; i < s; i++) { - decimalv2_do_crc(i); + update_crc_with_value(i, hashes[i]); } } else { for (size_t i = 0; i < s; i++) { - if (null_data[i] == 0) decimalv2_do_crc(i); + if (null_data[i] == 0) update_crc_with_value(i, hashes[i]); } } } } +template <typename T> +void ColumnDecimal<T>::update_xxHash_with_value(size_t n, uint64_t& hash) const { + hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]), sizeof(T), hash); +} + template <typename T> void ColumnDecimal<T>::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index f935c0c826..973f0bea68 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -184,6 +184,9 @@ public: void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override; + void update_crc_with_value(size_t n, uint64_t& crc) const override; + int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation& res) const override; diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index 4f06ee4dda..1924e2ba46 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -238,15 +238,79 @@ const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) { } void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const { - size_t array_size = size_at(n); + size_t kv_size = size_at(n); size_t offset = offset_at(n); - for (size_t i = 0; i < array_size; ++i) { + hash.update(reinterpret_cast<const char*>(&kv_size), sizeof(kv_size)); + for (size_t i = 0; i < kv_size; ++i) { get_keys().update_hash_with_value(offset + i, hash); get_values().update_hash_with_value(offset + i, hash); } } +void ColumnMap::update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const { + SIP_HASHES_FUNCTION_COLUMN_IMPL(); +} + +void ColumnMap::update_xxHash_with_value(size_t n, uint64_t& hash) const { + size_t kv_size = size_at(n); + size_t offset = offset_at(n); + + hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&kv_size), sizeof(kv_size), + hash); + for (auto i = 0; i < kv_size; ++i) { + get_keys().update_xxHash_with_value(offset + i, hash); + get_values().update_xxHash_with_value(offset + i, hash); + } +} + +void ColumnMap::update_crc_with_value(size_t n, uint64_t& crc) const { + size_t kv_size = size_at(n); + size_t offset = offset_at(n); + + crc = HashUtil::zlib_crc_hash(reinterpret_cast<const char*>(&kv_size), sizeof(kv_size), crc); + for (size_t i = 0; i < kv_size; ++i) { + get_keys().update_crc_with_value(offset + i, crc); + get_values().update_crc_with_value(offset + i, crc); + } +} + +void ColumnMap::update_hashes_with_value(uint64_t* hashes, const uint8_t* null_data) const { + size_t s = size(); + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_xxHash_with_value(i, hashes[i]); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_xxHash_with_value(i, hashes[i]); + } + } +} + +void ColumnMap::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, + const uint8_t* __restrict null_data) const { + auto s = hash.size(); + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_crc_with_value(i, hash[i]); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_crc_with_value(i, hash[i]); + } + } +} + void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) { if (length == 0) { return; diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 6cb83adf25..0d7bb2d0a7 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -39,6 +39,7 @@ #include "vec/columns/column_vector.h" #include "vec/common/assert_cast.h" #include "vec/common/cow.h" +#include "vec/common/sip_hash.h" #include "vec/common/string_ref.h" #include "vec/core/field.h" #include "vec/core/types.h" @@ -166,6 +167,18 @@ public: size_t allocated_bytes() const override; void protect() override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override; + void update_crc_with_value(size_t n, uint64_t& crc) const override; + + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override; + + void update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data = nullptr) const override; + + void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, + const uint8_t* __restrict null_data = nullptr) const override; + /******************** keys and values ***************/ const ColumnPtr& get_keys_ptr() const { return keys_column; } ColumnPtr& get_keys_ptr() { return keys_column; } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 42cda1d18d..ce5b68f3fb 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -65,6 +65,24 @@ MutableColumnPtr ColumnNullable::get_shrinked_column() { get_null_map_column_ptr()); } +void ColumnNullable::update_xxHash_with_value(size_t n, uint64_t& hash) const { + auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data(); + if (real_null_data[n] != 0) { + hash = HashUtil::xxHash64NullWithSeed(hash); + } else { + nested_column->update_xxHash_with_value(n, hash); + } +} + +void ColumnNullable::update_crc_with_value(size_t n, uint64_t& crc) const { + auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data(); + if (real_null_data[n] != 0) { + crc = HashUtil::zlib_crc_hash_null(crc); + } else { + nested_column->update_xxHash_with_value(n, crc); + } +} + void ColumnNullable::update_hash_with_value(size_t n, SipHash& hash) const { if (is_null_at(n)) hash.update(0); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index d5ca7f844b..be9ba72399 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -98,11 +98,7 @@ public: const char* get_family_name() const override { return "Nullable"; } std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; } MutableColumnPtr clone_resized(size_t size) const override; - size_t size() const override { - return nested_column->size( - - ); - } + size_t size() const override { return nested_column->size(); } bool is_null_at(size_t n) const override { return assert_cast<const ColumnUInt8&>(*null_map).get_data()[n] != 0; } @@ -219,6 +215,9 @@ public: ColumnPtr replicate(const Offsets& replicate_offsets) const override; void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, int count_sz = -1) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override; + void update_crc_with_value(size_t n, uint64_t& crc) const override; + void update_hash_with_value(size_t n, SipHash& hash) const override; void update_hashes_with_value(std::vector<SipHash>& hashes, const uint8_t* __restrict null_data) const override; diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index efd90fd844..703826cd24 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -398,6 +398,18 @@ public: void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const size_t num_rows, const uint8_t* null_map) override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override { + size_t string_size = size_at(n); + size_t offset = offset_at(n); + hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&chars[offset]), + string_size, hash); + } + + void update_crc_with_value(size_t n, uint64_t& crc) const override { + auto data_ref = get_data_at(n); + crc = HashUtil::zlib_crc_hash(data_ref.data, data_ref.size, crc); + } + void update_hash_with_value(size_t n, SipHash& hash) const override { size_t string_size = size_at(n); size_t offset = offset_at(n); diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index 2a5a505fb8..58f5a4abaf 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -191,6 +191,37 @@ void ColumnStruct::update_hash_with_value(size_t n, SipHash& hash) const { } } +void ColumnStruct::update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const { + SIP_HASHES_FUNCTION_COLUMN_IMPL(); +} + +void ColumnStruct::update_xxHash_with_value(size_t n, uint64_t& hash) const { + for (const auto& column : columns) { + column->update_xxHash_with_value(n, hash); + } +} + +void ColumnStruct::update_crc_with_value(size_t n, uint64_t& crc) const { + for (const auto& column : columns) { + column->update_crc_with_value(n, crc); + } +} + +void ColumnStruct::update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_hashes_with_value(hashes, null_data); + } +} + +void ColumnStruct::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_crcs_with_value(hash, type, null_data); + } +} + void ColumnStruct::insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) { const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src); diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index dfa8bd6f5d..3771d29e48 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -35,6 +35,7 @@ #include "vec/columns/column.h" #include "vec/columns/column_impl.h" #include "vec/common/cow.h" +#include "vec/common/sip_hash.h" #include "vec/common/string_ref.h" #include "vec/core/field.h" #include "vec/core/types.h" @@ -103,7 +104,19 @@ public: void pop_back(size_t n) override; StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; const char* deserialize_and_insert_from_arena(const char* pos) override; + void update_hash_with_value(size_t n, SipHash& hash) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override; + void update_crc_with_value(size_t n, uint64_t& crc) const override; + + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override; + + void update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data = nullptr) const override; + + void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, + const uint8_t* __restrict null_data = nullptr) const override; void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override; diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index c30796792e..67f2827c92 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -274,6 +274,24 @@ public: const uint8_t* null_map, size_t max_row_byte_size) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override { + hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]), sizeof(T), hash); + } + + void update_crc_with_value(size_t n, uint64_t& crc) const override { + if constexpr (!std::is_same_v<T, Int64>) { + crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc); + } else { + if (this->is_date_type() || this->is_datetime_type()) { + char buf[64]; + const VecDateTimeValue& date_val = (const VecDateTimeValue&)data[n]; + auto len = date_val.to_buffer(buf); + crc = HashUtil::zlib_crc_hash(buf, len, crc); + } else { + crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc); + } + } + } void update_hash_with_value(size_t n, SipHash& hash) const override; void update_hashes_with_value(std::vector<SipHash>& hashes, diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index df0bb396a3..b49c6ec92e 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -40,6 +40,7 @@ #include "runtime/types.h" #include "util/proto_util.h" #include "util/telemetry/telemetry.h" +#include "vec/columns/column_const.h" #include "vec/common/sip_hash.h" #include "vec/exprs/vexpr.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -629,7 +630,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { std::vector<SipHash> siphashs(rows); // result[j] means column index, i means rows index for (int j = 0; j < result_size; ++j) { - block->get_by_position(result[j]).column->update_hashes_with_value(siphashs); + // complex type most not implement get_data_at() method which column_const will call + unpack_if_const(block->get_by_position(result[j]).column) + .first->update_hashes_with_value(siphashs); } for (int i = 0; i < rows; i++) { hashes[i] = siphashs[i].get64() % element_size; @@ -638,7 +641,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { SCOPED_TIMER(_split_block_hash_compute_timer); // result[j] means column index, i means rows index, here to calculate the xxhash value for (int j = 0; j < result_size; ++j) { - block->get_by_position(result[j]).column->update_hashes_with_value(hashes); + // complex type most not implement get_data_at() method which column_const will call + unpack_if_const(block->get_by_position(result[j]).column) + .first->update_hashes_with_value(hashes); } for (int i = 0; i < rows; i++) { @@ -654,8 +659,10 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, hashes, rows, block)); } else { for (int j = 0; j < result_size; ++j) { - block->get_by_position(result[j]).column->update_crcs_with_value( - hash_vals, _partition_expr_ctxs[j]->root()->type().type); + // complex type most not implement get_data_at() method which column_const will call + unpack_if_const(block->get_by_position(result[j]).column) + .first->update_crcs_with_value( + hash_vals, _partition_expr_ctxs[j]->root()->type().type); } element_size = _channel_shared_ptrs.size(); for (int i = 0; i < rows; i++) { diff --git a/be/test/vec/columns/column_hash_func_test.cpp b/be/test/vec/columns/column_hash_func_test.cpp new file mode 100644 index 0000000000..0e409b4640 --- /dev/null +++ b/be/test/vec/columns/column_hash_func_test.cpp @@ -0,0 +1,190 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> + +#include "gtest/gtest_pred_impl.h" +#include "vec/columns/column_const.h" +#include "vec/core/field.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_date.h" +#include "vec/data_types/data_type_date_time.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_map.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/data_types/data_type_struct.h" + +namespace doris::vectorized { + +DataTypes create_scala_data_types() { + DataTypePtr dt = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()); + DataTypePtr d = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDate>()); + DataTypePtr dc = std::make_shared<DataTypeNullable>(vectorized::create_decimal(10, 2, false)); + DataTypePtr dcv2 = std::make_shared<DataTypeNullable>( + std::make_shared<DataTypeDecimal<vectorized::Decimal128>>(27, 9)); + DataTypePtr n3 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>()); + DataTypePtr n1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()); + DataTypePtr s1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + + DataTypes dataTypes; + dataTypes.push_back(dt); + dataTypes.push_back(d); + dataTypes.push_back(dc); + dataTypes.push_back(dcv2); + dataTypes.push_back(n3); + dataTypes.push_back(n1); + dataTypes.push_back(s1); + + return dataTypes; +} + +TEST(HashFuncTest, ArrayTypeTest) { + DataTypes dataTypes = create_scala_data_types(); + + std::vector<uint64_t> sip_hash_vals(1); + std::vector<uint64_t> xx_hash_vals(1); + std::vector<uint64_t> crc_hash_vals(1); + auto* __restrict sip_hashes = sip_hash_vals.data(); + auto* __restrict xx_hashes = xx_hash_vals.data(); + auto* __restrict crc_hashes = crc_hash_vals.data(); + + for (auto d : dataTypes) { + DataTypePtr a = std::make_shared<DataTypeArray>(d); + ColumnPtr col_a = a->create_column_const_with_default_value(1); + // sipHash + std::vector<SipHash> siphashs(1); + col_a->update_hashes_with_value(siphashs); + EXPECT_NO_FATAL_FAILURE(col_a->update_hashes_with_value(siphashs)); + sip_hashes[0] = siphashs[0].get64(); + std::cout << sip_hashes[0] << std::endl; + // xxHash + EXPECT_NO_FATAL_FAILURE(col_a->update_hashes_with_value(xx_hashes)); + std::cout << xx_hashes[0] << std::endl; + // crcHash + EXPECT_NO_FATAL_FAILURE( + col_a->update_crcs_with_value(crc_hash_vals, PrimitiveType::TYPE_ARRAY)); + std::cout << crc_hashes[0] << std::endl; + } +} + +TEST(HashFuncTest, ArrayCornerCaseTest) { + DataTypes dataTypes = create_scala_data_types(); + + DataTypePtr d = std::make_shared<DataTypeInt64>(); + DataTypePtr a = std::make_shared<DataTypeArray>(d); + MutableColumnPtr array_mutable_col = a->create_column(); + Array a1, a2; + a1.push_back(Int64(1)); + a1.push_back(Int64(2)); + a1.push_back(Int64(3)); + array_mutable_col->insert(a1); + array_mutable_col->insert(a1); + a2.push_back(Int64(11)); + a2.push_back(Int64(12)); + a2.push_back(Int64(13)); + array_mutable_col->insert(a2); + + EXPECT_EQ(array_mutable_col->size(), 3); + + std::vector<uint64_t> sip_hash_vals(3); + std::vector<uint64_t> xx_hash_vals(3); + std::vector<uint64_t> crc_hash_vals(3); + auto* __restrict sip_hashes = sip_hash_vals.data(); + auto* __restrict xx_hashes = xx_hash_vals.data(); + auto* __restrict crc_hashes = crc_hash_vals.data(); + + // sipHash + std::vector<SipHash> siphashs(3); + array_mutable_col->update_hashes_with_value(siphashs); + EXPECT_NO_FATAL_FAILURE(array_mutable_col->update_hashes_with_value(siphashs)); + sip_hashes[0] = siphashs[0].get64(); + sip_hashes[1] = siphashs[1].get64(); + sip_hashes[2] = siphashs[2].get64(); + EXPECT_EQ(sip_hashes[0], sip_hash_vals[1]); + EXPECT_TRUE(sip_hash_vals[0] != sip_hash_vals[2]); + // xxHash + EXPECT_NO_FATAL_FAILURE(array_mutable_col->update_hashes_with_value(xx_hashes)); + EXPECT_EQ(xx_hashes[0], xx_hashes[1]); + EXPECT_TRUE(xx_hashes[0] != xx_hashes[2]); + // crcHash + EXPECT_NO_FATAL_FAILURE( + array_mutable_col->update_crcs_with_value(crc_hash_vals, PrimitiveType::TYPE_ARRAY)); + EXPECT_EQ(crc_hashes[0], crc_hashes[1]); + EXPECT_TRUE(xx_hashes[0] != xx_hashes[2]); +} + +TEST(HashFuncTest, MapTypeTest) { + DataTypes dataTypes = create_scala_data_types(); + + std::vector<uint64_t> sip_hash_vals(1); + std::vector<uint64_t> xx_hash_vals(1); + std::vector<uint64_t> crc_hash_vals(1); + auto* __restrict sip_hashes = sip_hash_vals.data(); + auto* __restrict xx_hashes = xx_hash_vals.data(); + auto* __restrict crc_hashes = crc_hash_vals.data(); + // data_type_map + for (int i = 0; i < dataTypes.size() - 1; ++i) { + DataTypePtr a = std::make_shared<DataTypeMap>(dataTypes[i], dataTypes[i + 1]); + ColumnPtr col_a = a->create_column_const_with_default_value(1); + // sipHash + std::vector<SipHash> siphashs(1); + EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(siphashs)); + sip_hashes[0] = siphashs[0].get64(); + std::cout << sip_hashes[0] << std::endl; + // xxHash + EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(xx_hashes)); + std::cout << xx_hashes[0] << std::endl; + // crcHash + EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_crcs_with_value( + crc_hash_vals, PrimitiveType::TYPE_MAP)); + std::cout << crc_hashes[0] << std::endl; + } +} + +TEST(HashFuncTest, StructTypeTest) { + DataTypes dataTypes = create_scala_data_types(); + + std::vector<uint64_t> sip_hash_vals(1); + std::vector<uint64_t> xx_hash_vals(1); + std::vector<uint64_t> crc_hash_vals(1); + auto* __restrict sip_hashes = sip_hash_vals.data(); + auto* __restrict xx_hashes = xx_hash_vals.data(); + auto* __restrict crc_hashes = crc_hash_vals.data(); + + // data_type_struct + DataTypePtr a = std::make_shared<DataTypeStruct>(dataTypes); + ColumnPtr col_a = a->create_column_const_with_default_value(1); + // sipHash + std::vector<SipHash> siphashs(1); + EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(siphashs)); + sip_hashes[0] = siphashs[0].get64(); + std::cout << sip_hashes[0] << std::endl; + // xxHash + EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(xx_hashes)); + std::cout << xx_hashes[0] << std::endl; + // crcHash + EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_crcs_with_value( + crc_hash_vals, PrimitiveType::TYPE_STRUCT)); + std::cout << crc_hashes[0] << std::endl; +} + +} // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org