This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b35cfc5d5e [opt](join) Opt the performance of join probe (#21845) b35cfc5d5e is described below commit b35cfc5d5ea16f86a47e0b897bc7c9518d6290a4 Author: HappenLee <happen...@hotmail.com> AuthorDate: Wed Jul 19 01:21:22 2023 +0800 [opt](join) Opt the performance of join probe (#21845) --- be/src/vec/columns/column.h | 3 +- be/src/vec/columns/column_array.cpp | 33 +++++++------- be/src/vec/columns/column_array.h | 3 +- be/src/vec/columns/column_complex.h | 20 +++------ be/src/vec/columns/column_const.cpp | 5 +-- be/src/vec/columns/column_const.h | 3 +- be/src/vec/columns/column_decimal.cpp | 14 +++--- be/src/vec/columns/column_decimal.h | 3 +- be/src/vec/columns/column_nullable.cpp | 8 ++-- be/src/vec/columns/column_nullable.h | 3 +- be/src/vec/columns/column_string.cpp | 46 ++++++++----------- be/src/vec/columns/column_string.h | 3 +- be/src/vec/columns/column_struct.cpp | 10 +---- be/src/vec/columns/column_struct.h | 3 +- be/src/vec/columns/column_vector.cpp | 20 ++++----- be/src/vec/columns/column_vector.h | 3 +- be/src/vec/exec/join/process_hash_table_probe.h | 2 +- .../vec/exec/join/process_hash_table_probe_impl.h | 52 ++++++++++++++-------- .../array/function_array_with_constant.cpp | 2 +- be/test/vec/core/column_array_test.cpp | 8 ++-- 20 files changed, 109 insertions(+), 135 deletions(-) diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 0ff14a1653..84d035946d 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -488,8 +488,7 @@ public: * If `begin` and `count_sz` specified, it means elements in range [`begin`, `begin` + `count_sz`) will be replicated. * If `count_sz` is -1, `begin` must be 0. */ - virtual void replicate(const uint32_t* counts, size_t target_size, IColumn& column, - size_t begin = 0, int count_sz = -1) const { + virtual void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const { LOG(FATAL) << "not support"; } diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index b575f7bf15..94ec98c104 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -864,32 +864,29 @@ ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) cons return replicate_generic(replicate_offsets); } -void ColumnArray::replicate(const uint32_t* counts, size_t target_size, IColumn& column, - size_t begin, int count_sz) const { - size_t col_size = count_sz < 0 ? size() : count_sz; - if (col_size == 0) { +void ColumnArray::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const { + if (target_size == 0) { return; } + auto total_size = get_offsets().size(); // |---------------------|-------------------------|-------------------------| // [0, begin) [begin, begin + count_sz) [begin + count_sz, size()) // do not need to copy copy counts[n] times do not need to copy - IColumn::Offsets replicate_offsets(get_offsets().size(), 0); - size_t cur_offset = 0; - size_t end = begin + col_size; + IColumn::Offsets replicate_offsets(total_size, 0); // copy original data at offset n counts[n] times - for (size_t i = begin; i < end; ++i) { - cur_offset += counts[i]; - replicate_offsets[i] = cur_offset; - } - // ignored - for (size_t i = end; i < size(); ++i) { - replicate_offsets[i] = replicate_offsets[i - 1]; + auto begin = 0, end = 0; + while (begin < target_size) { + while (end < target_size && indexs[begin] == indexs[end]) { + end++; + } + long index = indexs[begin]; + replicate_offsets[index] = end - begin; + begin = end; } - if (cur_offset != target_size) { - LOG(WARNING) << "ColumnArray replicate input target_size:" << target_size - << " not equal SUM(counts):" << cur_offset; - return; + // ignored + for (size_t i = 1; i < total_size; ++i) { + replicate_offsets[i] += replicate_offsets[i - 1]; } auto rep_res = replicate(replicate_offsets); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 4fe1827e17..18bdc74bc1 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -177,8 +177,7 @@ public: size_t allocated_bytes() const override; void protect() override; ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, - int count_sz = -1) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; ColumnPtr convert_to_full_column_if_const() const override; void get_extremes(Field& min, Field& max) const override { LOG(FATAL) << "get_extremes not implemented"; diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index 138f3d0fb3..211de96df6 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -271,8 +271,7 @@ public: ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, - int count_sz = -1) const override; + void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; [[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns, const IColumn::Selector& selector) const override { @@ -410,21 +409,14 @@ ColumnPtr ColumnComplexType<T>::replicate(const IColumn::Offsets& offsets) const } template <typename T> -void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column, - size_t begin, int count_sz) const { - size_t size = count_sz < 0 ? data.size() : count_sz; - if (0 == size) return; - +void ColumnComplexType<T>::replicate(const uint32_t* indexs, size_t target_size, + IColumn& column) const { auto& res = reinterpret_cast<ColumnComplexType<T>&>(column); typename Self::Container& res_data = res.get_data(); - res_data.reserve(target_size); + res_data.resize(target_size); - size_t end = size + begin; - for (size_t i = begin; i < end; ++i) { - size_t size_to_replicate = counts[i]; - for (size_t j = 0; j < size_to_replicate; ++j) { - res_data.push_back(data[i]); - } + for (size_t i = 0; i < target_size; ++i) { + res_data[i] = data[indexs[i]]; } } diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index 13124dbd23..d8dbae40f4 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -78,11 +78,10 @@ ColumnPtr ColumnConst::replicate(const Offsets& offsets) const { return ColumnConst::create(data, replicated_size); } -void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column, - size_t begin, int count_sz) const { +void ColumnConst::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { if (s == 0) return; auto& res = reinterpret_cast<ColumnConst&>(column); - res.s = s; + res.s = target_size; } ColumnPtr ColumnConst::permute(const Permutation& perm, size_t limit) const { diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 7554e773b9..3f33ee5792 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -191,8 +191,7 @@ public: size_t filter(const Filter& filter) override; ColumnPtr replicate(const Offsets& offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, - int count_sz = -1) const override; + void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; ColumnPtr permute(const Permutation& perm, size_t limit) const override; // ColumnPtr index(const IColumn & indexes, size_t limit) const override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index a73be249eb..2c98e1193b 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -440,18 +440,14 @@ ColumnPtr ColumnDecimal<T>::replicate(const IColumn::Offsets& offsets) const { } template <typename T> -void ColumnDecimal<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column, - size_t begin, int count_sz) const { - size_t size = count_sz < 0 ? data.size() : count_sz; - if (0 == size) return; - +void ColumnDecimal<T>::replicate(const uint32_t* __restrict indexs, size_t target_size, + IColumn& column) const { auto& res = reinterpret_cast<ColumnDecimal<T>&>(column); typename Self::Container& res_data = res.get_data(); - res_data.reserve(target_size); + res_data.resize(target_size); - size_t end = size + begin; - for (size_t i = begin; i < end; ++i) { - res_data.add_num_element_without_reserve(data[i], counts[i]); + for (size_t i = 0; i < target_size; ++i) { + res_data[i] = data[indexs[i]]; } } diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 404c8d7019..aae9ab94d1 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -227,8 +227,7 @@ public: ColumnPtr replicate(const IColumn::Offsets& offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, - int count_sz = -1) const override; + void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; TypeIndex get_data_type() const override { return TypeId<T>::value; } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 538bcd27a6..bda5029c31 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -578,12 +578,10 @@ ColumnPtr ColumnNullable::replicate(const Offsets& offsets) const { return ColumnNullable::create(replicated_data, replicated_null_map); } -void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column, - size_t begin, int count_sz) const { +void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, IColumn& column) const { auto& res = reinterpret_cast<ColumnNullable&>(column); - get_nested_column().replicate(counts, target_size, res.get_nested_column(), begin, count_sz); - get_null_map_column().replicate(counts, target_size, res.get_null_map_column(), begin, - count_sz); + get_nested_column().replicate(counts, target_size, res.get_nested_column()); + get_null_map_column().replicate(counts, target_size, res.get_null_map_column()); } template <bool negative> diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 11c24be294..c3ee24600e 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -213,8 +213,7 @@ public: size_t allocated_bytes() const override; void protect() override; ColumnPtr replicate(const Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, - int count_sz = -1) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint64_t& hash, diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 3277e62894..4ea25b7688 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -446,39 +446,29 @@ ColumnPtr ColumnString::replicate(const Offsets& replicate_offsets) const { return res; } -void ColumnString::replicate(const uint32_t* counts, size_t target_size, IColumn& column, - size_t begin, int count_sz) const { - size_t col_size = count_sz < 0 ? size() : count_sz; - if (0 == col_size) { - return; - } - +void ColumnString::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const { auto& res = reinterpret_cast<ColumnString&>(column); Chars& res_chars = res.chars; Offsets& res_offsets = res.offsets; - res_chars.reserve(chars.size() / col_size * target_size); - res_offsets.reserve(target_size); - - size_t base = begin > 0 ? offsets[begin - 1] : 0; - Offset prev_string_offset = 0 + base; - Offset current_new_offset = 0; - - size_t end = begin + col_size; - for (size_t i = begin; i < end; ++i) { - size_t size_to_replicate = counts[i]; - size_t string_size = offsets[i] - prev_string_offset; - - for (size_t j = 0; j < size_to_replicate; ++j) { - current_new_offset += string_size; - res_offsets.push_back(current_new_offset); - res_chars.resize(res_chars.size() + string_size); - memcpy_small_allow_read_write_overflow15(&res_chars[res_chars.size() - string_size], - &chars[prev_string_offset], string_size); - } - - prev_string_offset = offsets[i]; + size_t byte_size = 0; + res_offsets.resize(target_size); + for (size_t i = 0; i < target_size; ++i) { + long row_idx = indexs[i]; + auto str_size = offsets[row_idx] - offsets[row_idx - 1]; + res_offsets[i] = res_offsets[i - 1] + str_size; + byte_size += str_size; + } + + res_chars.resize(byte_size); + auto* __restrict dest = res.chars.data(); + auto* __restrict src = chars.data(); + for (size_t i = 0; i < target_size; ++i) { + long row_idx = indexs[i]; + auto str_size = offsets[row_idx] - offsets[row_idx - 1]; + memcpy_small_allow_read_write_overflow15(dest + res_offsets[i - 1], + src + offsets[row_idx - 1], str_size); } check_chars_length(res_chars.size(), res_offsets.size()); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index ac95f78037..14c426c762 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -506,8 +506,7 @@ public: ColumnPtr replicate(const Offsets& replicate_offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, - int count_sz = -1) const override; + void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl<ColumnString>(num_columns, selector); diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index 0b3bcb24e8..78c438a5e1 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -294,18 +294,12 @@ ColumnPtr ColumnStruct::replicate(const Offsets& offsets) const { return ColumnStruct::create(new_columns); } -void ColumnStruct::replicate(const uint32_t* counts, size_t target_size, IColumn& column, - size_t begin, int count_sz) const { - size_t col_size = count_sz < 0 ? size() : count_sz; - if (0 == col_size) { - return; - } - +void ColumnStruct::replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const { auto& res = reinterpret_cast<ColumnStruct&>(column); res.columns.resize(columns.size()); for (size_t i = 0; i != columns.size(); ++i) { - columns[i]->replicate(counts, target_size, *res.columns[i], begin, count_sz); + columns[i]->replicate(indexs, target_size, *res.columns[i]); } } diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index 9073725e81..700b5801c3 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -143,8 +143,7 @@ public: Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override; ColumnPtr permute(const Permutation& perm, size_t limit) const override; ColumnPtr replicate(const Offsets& offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, - int count_sz = -1) const override; + void replicate(const uint32_t* counts, size_t target_size, IColumn& column) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override; // ColumnPtr index(const IColumn & indexes, size_t limit) const override; diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 388b436bc5..82555638d7 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -545,18 +545,18 @@ ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets& offsets) const { } template <typename T> -void ColumnVector<T>::replicate(const uint32_t* counts, size_t target_size, IColumn& column, - size_t begin, int count_sz) const { - size_t size = count_sz < 0 ? data.size() : count_sz; - if (size == 0) return; - +void ColumnVector<T>::replicate(const uint32_t* __restrict indexs, size_t target_size, + IColumn& column) const { auto& res = reinterpret_cast<ColumnVector<T>&>(column); typename Self::Container& res_data = res.get_data(); - res_data.reserve(target_size); - - size_t end = begin + size; - for (size_t i = begin; i < end; ++i) { - res_data.add_num_element_without_reserve(data[i], counts[i]); + DCHECK(res_data.empty()); + res_data.resize(target_size); + auto* __restrict left = res_data.data(); + auto* __restrict right = data.data(); + auto* __restrict idxs = indexs; + + for (size_t i = 0; i < target_size; ++i) { + left[i] = right[idxs[i]]; } } diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index 48228822f3..b8c119a217 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -425,8 +425,7 @@ public: ColumnPtr replicate(const IColumn::Offsets& offsets) const override; - void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0, - int count_sz = -1) const override; + void replicate(const uint32_t* indexs, size_t target_size, IColumn& column) const override; void get_extremes(Field& min, Field& max) const override; diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h index db43daf20d..79e126584c 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -88,7 +88,7 @@ struct ProcessHashTableProbe { std::unique_ptr<Arena> _arena; std::vector<StringRef> _probe_keys; - std::vector<uint32_t> _items_counts; + std::vector<uint32_t> _probe_indexs; std::vector<int8_t> _build_block_offsets; std::vector<int> _build_block_rows; std::vector<std::pair<int8_t, int>> _build_blocks_locs; diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 4e9180bed9..d859632701 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -136,8 +136,8 @@ void ProcessHashTableProbe<JoinOpType>::probe_side_output_column( if (all_match_one) { mcol[i]->insert_range_from(*column, last_probe_index, probe_size); } else { - DCHECK_GE(_items_counts.size(), last_probe_index + probe_size); - column->replicate(&_items_counts[0], size, *mcol[i], last_probe_index, probe_size); + DCHECK_GE(_probe_indexs.size(), last_probe_index + probe_size); + column->replicate(&_probe_indexs[0], size, *mcol[i]); } } else { mcol[i]->insert_many_defaults(size); @@ -216,10 +216,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c bool is_mark_join) { auto& probe_index = _join_node->_probe_index; auto& probe_raw_ptrs = _join_node->_probe_columns; - if (probe_index == 0 && _items_counts.size() < probe_rows) { - _items_counts.resize(probe_rows); - } + _probe_indexs.resize(_batch_size); if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); @@ -264,13 +262,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c if (LIKELY(current_offset < _build_block_rows.size())) { _build_block_offsets[current_offset] = probe_row_match_iter->block_offset; _build_block_rows[current_offset] = probe_row_match_iter->row_num; + _probe_indexs[current_offset] = probe_index; } else { _build_block_offsets.emplace_back(probe_row_match_iter->block_offset); _build_block_rows.emplace_back(probe_row_match_iter->row_num); + _probe_indexs.template emplace_back(probe_index); } ++current_offset; } - _items_counts[probe_index] = current_offset; all_match_one &= (current_offset == 1); if (!probe_row_match_iter.ok()) { ++probe_index; @@ -284,19 +283,19 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c if constexpr (ignore_null && need_null_map_for_probe) { if ((*null_map)[probe_index]) { if constexpr (probe_all) { - _items_counts[probe_index++] = (uint32_t)1; // only full outer / left outer need insert the data of right table if (LIKELY(current_offset < _build_block_rows.size())) { _build_block_offsets[current_offset] = -1; _build_block_rows[current_offset] = -1; + _probe_indexs[current_offset] = probe_index; } else { _build_block_offsets.emplace_back(-1); _build_block_rows.emplace_back(-1); + _probe_indexs.template emplace_back(probe_index); } ++current_offset; - } else { - _items_counts[probe_index++] = (uint32_t)0; } + probe_index++; all_match_one = false; if constexpr (probe_all) { if (current_offset >= _batch_size) { @@ -415,7 +414,16 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c } uint32_t count = (uint32_t)(current_offset - last_offset); - _items_counts[current_probe_index] = count; + if (LIKELY(current_offset < _probe_indexs.size())) { + for (int i = last_offset; i < current_offset; ++i) { + _probe_indexs[i] = current_probe_index; + } + } else { + for (int i = last_offset; i < _probe_indexs.size(); ++i) { + _probe_indexs[i] = current_probe_index; + } + _probe_indexs.resize(current_offset, current_probe_index); + } all_match_one &= (count == 1); if (current_offset >= _batch_size) { break; @@ -451,10 +459,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts( Block* output_block, size_t probe_rows, bool is_mark_join) { auto& probe_index = _join_node->_probe_index; auto& probe_raw_ptrs = _join_node->_probe_columns; - if (probe_index == 0 && _items_counts.size() < probe_rows) { - _items_counts.resize(probe_rows); - } if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { + _probe_indexs.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); } @@ -501,9 +507,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts( for (; probe_row_match_iter.ok() && current_offset < _batch_size; ++probe_row_match_iter) { if (LIKELY(current_offset < _build_block_rows.size())) { + _probe_indexs[current_offset] = probe_index; _build_block_offsets[current_offset] = probe_row_match_iter->block_offset; _build_block_rows[current_offset] = probe_row_match_iter->row_num; } else { + _probe_indexs.template emplace_back(probe_index); _build_block_offsets.emplace_back(probe_row_match_iter->block_offset); _build_block_rows.emplace_back(probe_row_match_iter->row_num); } @@ -517,7 +525,6 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts( row_count_from_last_probe = current_offset; all_match_one &= (current_offset == 1); - _items_counts[probe_index] = current_offset; if (!probe_row_match_iter.ok()) { ++probe_index; is_the_last_sub_block = true; @@ -532,21 +539,21 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts( if constexpr (ignore_null && need_null_map_for_probe) { if ((*null_map)[probe_index]) { if constexpr (probe_all) { - _items_counts[probe_index++] = (uint32_t)1; same_to_prev.emplace_back(false); visited_map.emplace_back(nullptr); // only full outer / left outer need insert the data of right table if (LIKELY(current_offset < _build_block_rows.size())) { + _probe_indexs[current_offset] = probe_index; _build_block_offsets[current_offset] = -1; _build_block_rows[current_offset] = -1; } else { + _probe_indexs.template emplace_back(probe_index); _build_block_offsets.emplace_back(-1); _build_block_rows.emplace_back(-1); } ++current_offset; - } else { - _items_counts[probe_index++] = (uint32_t)0; } + probe_index++; all_match_one = false; if constexpr (probe_all) { if (current_offset >= _batch_size) { @@ -676,7 +683,16 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts( ++probe_index; } uint32_t count = (uint32_t)(current_offset - last_offset); - _items_counts[current_probe_index] = count; + if (LIKELY(current_offset < _probe_indexs.size())) { + for (int i = last_offset; i < current_offset; ++i) { + _probe_indexs[i] = current_probe_index; + } + } else { + for (int i = last_offset; i < _probe_indexs.size(); ++i) { + _probe_indexs[i] = current_probe_index; + } + _probe_indexs.resize(current_offset, current_probe_index); + } all_match_one &= (count == 1); if (current_offset >= _batch_size) { break; diff --git a/be/src/vec/functions/array/function_array_with_constant.cpp b/be/src/vec/functions/array/function_array_with_constant.cpp index e1a0c4f15e..9e3885b1c3 100644 --- a/be/src/vec/functions/array/function_array_with_constant.cpp +++ b/be/src/vec/functions/array/function_array_with_constant.cpp @@ -98,7 +98,7 @@ public: } offset += array_size; offsets.push_back(offset); - array_sizes.push_back(array_size); + array_sizes.resize(array_sizes.size() + array_size, i); } auto clone = value->clone_empty(); clone->reserve(input_rows_count); diff --git a/be/test/vec/core/column_array_test.cpp b/be/test/vec/core/column_array_test.cpp index a87bea1cb8..fd2ed21273 100644 --- a/be/test/vec/core/column_array_test.cpp +++ b/be/test/vec/core/column_array_test.cpp @@ -200,8 +200,8 @@ TEST(ColumnArrayTest, IntArrayReplicateTest) { } ColumnArray array_column(std::move(data_column), std::move(off_column)); - uint32_t counts[] = {2, 1, 0, 3}; // size should be equal array_column.size() - size_t target_size = 6; // sum(counts) + uint32_t counts[] = {0, 0, 1, 3, 3, 3}; // size should be equal array_column.size() + size_t target_size = 6; // sum(counts) // return array column: [[1,2,3],[1,2,3],[],[5,6],[5,6],[5,6]]; auto res1 = array_column.clone_empty(); @@ -224,8 +224,8 @@ TEST(ColumnArrayTest, StringArrayReplicateTest) { } ColumnArray array_column(std::move(data_column), std::move(off_column)); - uint32_t counts[] = {2, 1, 0, 3}; // size should be equal array_column.size() - size_t target_size = 6; // sum(counts) + uint32_t counts[] = {0, 0, 1, 3, 3, 3}; // size should be equal array_column.size() + size_t target_size = 6; // sum(counts) // return array column: [["abc","d"],["abc","d"],["ef"],[""],[""],[""]]; auto res1 = array_column.clone_empty(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org