This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch dev_join in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8b3a79cb5fced02a1dccf6242f95558583b95bd5 Author: BiteTheDDDDt <[email protected]> AuthorDate: Thu Oct 26 13:40:52 2023 +0800 update update update fix --- be/src/exprs/runtime_filter_slots.h | 5 ++-- .../local_exchange_sink_operator.cpp | 4 +-- be/src/vec/columns/column_vector.cpp | 13 ++------- be/src/vec/common/hash_table/hash_map.h | 7 ++++- be/src/vec/common/hash_table/hash_map_context.h | 32 ++++++++++++++-------- .../vec/exec/join/process_hash_table_probe_impl.h | 2 +- be/src/vec/exec/join/vhash_join_node.h | 6 ++-- 7 files changed, 36 insertions(+), 33 deletions(-) diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 63c5665d271..0f841e5a60f 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -174,6 +174,7 @@ public: auto column = it->get_by_position(result_column_id).column; std::vector<int> indexs; + // indexs start from 1 because the first row is mocked for join hash map if (const auto* nullable = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) { column = nullable->get_nested_column_ptr(); @@ -181,14 +182,14 @@ public: nullable->get_null_map_column_ptr().get()) ->get_data() .data(); - for (int i = 0; i < column->size(); i++) { + for (int i = 1; i < column->size(); i++) { if (null_map[i]) { continue; } indexs.push_back(i); } } else { - for (int i = 0; i < column->size(); i++) { + for (int i = 1; i < column->size(); i++) { indexs.push_back(i); } } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index ec959b20cea..c2fec25e995 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -37,7 +37,7 @@ Status LocalExchangeSinkLocalState::channel_add_rows(RuntimeState* state, vectorized::Block* block, SourceState source_state) { auto& data_queue = _shared_state->data_queue; - std::vector<int> channel2rows[data_queue.size()]; + std::vector<uint32_t> channel2rows[data_queue.size()]; auto rows = block->rows(); for (int i = 0; i < rows; i++) { @@ -48,7 +48,7 @@ Status LocalExchangeSinkLocalState::channel_add_rows(RuntimeState* state, _mutable_block[i] = vectorized::MutableBlock::create_unique(block->clone_empty()); } - const int* begin = channel2rows[i].data(); + const auto* begin = channel2rows[i].data(); _mutable_block[i]->add_rows(block, begin, begin + channel2rows[i].size()); if (_mutable_block[i]->rows() > state->batch_size() || source_state == SourceState::FINISHED) { diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index c4ca97df7a2..69c09dcde80 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -375,17 +375,8 @@ void ColumnVector<T>::insert_indices_from(const IColumn& src, const T* src_data = reinterpret_cast<const T*>(src.get_raw_data().data); - if constexpr (std::is_same_v<T, UInt8>) { - // nullmap : indices_begin[i] == 0 means is null at the here, set true here - for (int i = 0; i < new_size; ++i) { - data[origin_size + i] = - (indices_begin[i] == 0) + (indices_begin[i] != 0) * src_data[indices_begin[i]]; - } - } else { - // real data : indices_begin[i] == 0 what at is meaningless - for (int i = 0; i < new_size; ++i) { - data[origin_size + i] = src_data[indices_begin[i]]; - } + for (int i = 0; i < new_size; ++i) { + data[origin_size + i] = src_data[indices_begin[i]]; } } diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 1c132fc99ed..cedcb0515a6 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -336,6 +336,10 @@ private: uint32_t build_idx = 0; const auto batch_size = max_batch_size; + if (!build_keys) { + probe_idx = probe_rows; + } + auto do_the_probe = [&]() { while (build_idx && LIKELY(matched_cnt < batch_size)) { if (keys[probe_idx] == build_keys[build_idx]) { @@ -381,7 +385,8 @@ private: return std::pair {probe_idx, matched_cnt}; } - const Key* __restrict build_keys; + const Key* __restrict build_keys = nullptr; + std::vector<uint8_t> visited; int max_batch_size = 0; diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 549301ae477..01bdd88fc74 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -153,6 +153,10 @@ struct MethodSerialized : public MethodBase<TData> { using State = ColumnsHashing::HashMethodSerialized<typename Base::Value, typename Base::Mapped>; using Base::try_presis_key; + // need keep until the hash probe end. + std::vector<StringRef> build_stored_keys; + Arena build_arena; + // refresh each time probe std::vector<StringRef> stored_keys; StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size, @@ -167,10 +171,10 @@ struct MethodSerialized : public MethodBase<TData> { return {begin, sum_size}; } - void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, - const uint8_t* null_map = nullptr, bool is_build = false) override { - Base::arena.clear(); - stored_keys.resize(num_rows); + void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t num_rows, + std::vector<StringRef>& keys, Arena& arena) { + arena.clear(); + keys.resize(num_rows); size_t max_one_row_byte_size = 0; for (const auto& column : key_columns) { @@ -182,24 +186,28 @@ struct MethodSerialized : public MethodBase<TData> { // reach mem limit, don't serialize in batch size_t keys_size = key_columns.size(); for (size_t i = 0; i < num_rows; ++i) { - stored_keys[i] = - serialize_keys_to_pool_contiguous(i, keys_size, key_columns, Base::arena); + keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, key_columns, arena); } } else { - auto* serialized_key_buffer = - reinterpret_cast<uint8_t*>(Base::arena.alloc(total_bytes)); + auto* serialized_key_buffer = reinterpret_cast<uint8_t*>(arena.alloc(total_bytes)); for (size_t i = 0; i < num_rows; ++i) { - stored_keys[i].data = + keys[i].data = reinterpret_cast<char*>(serialized_key_buffer + i * max_one_row_byte_size); - stored_keys[i].size = 0; + keys[i].size = 0; } for (const auto& column : key_columns) { - column->serialize_vec(stored_keys, num_rows, max_one_row_byte_size); + column->serialize_vec(keys, num_rows, max_one_row_byte_size); } } - Base::keys = stored_keys.data(); + Base::keys = keys.data(); + } + + void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t num_rows, + const uint8_t* null_map = nullptr, bool is_build = false) override { + init_serialized_keys_impl(key_columns, num_rows, is_build ? build_stored_keys : stored_keys, + is_build ? build_arena : Base::arena); Base::init_hash_values(num_rows, null_map); } diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index a267a96d55f..8827d51783d 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -68,7 +68,7 @@ void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column( constexpr auto probe_all = JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - if (!is_semi_anti_join || have_other_join_conjunct) { + if ((!is_semi_anti_join || have_other_join_conjunct) && size) { for (int i = 0; i < _right_col_len; i++) { const auto& column = *_build_block->safe_get_by_position(i).column; if (output_slot_flags[i]) { diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 6fb1703120e..a514f454214 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -87,10 +87,8 @@ struct ProcessRuntimeFilterBuild { state, hash_table_ctx.hash_table->size(), parent->_build_rf_cardinality)); if (!parent->_runtime_filter_slots->empty() && !parent->_inserted_blocks.empty()) { - { - SCOPED_TIMER(parent->_push_compute_timer); - parent->_runtime_filter_slots->insert(parent->_inserted_blocks); - } + SCOPED_TIMER(parent->_push_compute_timer); + parent->_runtime_filter_slots->insert(parent->_inserted_blocks); } { SCOPED_TIMER(parent->_push_down_timer); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
