This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch new_join2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 57160b4346fd48c5c597eb37e116f830a8658732 Author: BiteTheDDDDt <pxl...@qq.com> AuthorDate: Mon Nov 27 15:17:47 2023 +0800 fix --- be/src/vec/common/hash_table/hash_map.h | 22 +++++++++++++++------- be/src/vec/exec/join/process_hash_table_probe.h | 2 +- .../vec/exec/join/process_hash_table_probe_impl.h | 15 +++++++++++---- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/be/src/vec/common/hash_table/hash_map.h b/be/src/vec/common/hash_table/hash_map.h index 657f91d29ad..aed0d31b8d8 100644 --- a/be/src/vec/common/hash_table/hash_map.h +++ b/be/src/vec/common/hash_table/hash_map.h @@ -318,9 +318,15 @@ public: bool has_null_key() { return _has_null_key; } - void pre_build_idxs(std::vector<uint32>& bucksets) { - for (uint32_t i = 0; i < bucksets.size(); i++) { - bucksets[i] = first[bucksets[i]]; + void pre_build_idxs(std::vector<uint32>& bucksets, const uint8_t* null_map) { + if (null_map) { + for (uint32_t i = 0; i < bucksets.size(); i++) { + bucksets[i] = null_map[i] ? bucket_size : first[bucksets[i]]; + } + } else { + for (uint32_t i = 0; i < bucksets.size(); i++) { + bucksets[i] = first[bucksets[i]]; + } } } @@ -335,14 +341,14 @@ private: const auto batch_size = max_batch_size; while (probe_idx < probe_rows && matched_cnt < batch_size) { - auto build_idx = build_idx_map[probe_idx]; + auto build_idx = build_idx_map[probe_idx] == bucket_size ? 0 : build_idx_map[probe_idx]; while (build_idx && keys[probe_idx] != build_keys[build_idx]) { build_idx = next[build_idx]; } if constexpr (!with_other_conjuncts) { - if (!build_idx_map[probe_idx]) { + if (build_idx_map[probe_idx] == bucket_size) { // mark result as null when probe row is null mark_column->insert_null(); } else { @@ -389,7 +395,7 @@ private: while (probe_idx < probe_rows && matched_cnt < batch_size) { if constexpr (need_judge_null) { - if (!build_idx_map[probe_idx]) { + if (build_idx_map[probe_idx] == bucket_size) { probe_idx++; continue; } @@ -420,7 +426,9 @@ private: if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) { if (!visited[build_idx] && keys[probe_idx] == build_keys[build_idx]) { - build_idxs[matched_cnt++] = build_idx; + probe_idxs[matched_cnt] = probe_idx; + build_idxs[matched_cnt] = build_idx; + matched_cnt++; } } else if (keys[probe_idx] == build_keys[build_idx]) { build_idxs[matched_cnt] = build_idx; diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h index 995c3992245..eff125ef22e 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -73,7 +73,7 @@ struct ProcessHashTableProbe { template <typename HashTableType> typename HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows, bool with_other_join_conjuncts, - const uint8_t* null_map); + const uint8_t* null_map, bool need_judge_null); // Process full outer join/ right join / right semi/anti join to output the join result // in hash table diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 49c8b0dfbd7..f32762df541 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -125,7 +125,7 @@ template <int JoinOpType, typename Parent> template <typename HashTableType> typename HashTableType::State ProcessHashTableProbe<JoinOpType, Parent>::_init_probe_side( HashTableType& hash_table_ctx, size_t probe_rows, bool with_other_join_conjuncts, - const uint8_t* null_map) { + const uint8_t* null_map, bool need_judge_null) { // may over batch size 1 for some outer join case _probe_indexs.resize(_batch_size + 1); _build_indexs.resize(_batch_size + 1); @@ -135,7 +135,8 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType, Parent>::_init_p hash_table_ctx.reset(); hash_table_ctx.init_serialized_keys(_parent->_probe_columns, probe_rows, null_map, true, false, hash_table_ctx.hash_table->get_bucket_size()); - hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums); + hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums, + need_judge_null ? null_map : nullptr); } return typename HashTableType::State(_parent->_probe_columns); } @@ -156,8 +157,13 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash auto& build_index = _parent->_build_index; auto last_probe_index = probe_index; - _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, with_other_conjuncts, - need_null_map_for_probe ? null_map->data() : nullptr); + _init_probe_side<HashTableType>( + hash_table_ctx, probe_rows, with_other_conjuncts, + need_null_map_for_probe ? null_map->data() : nullptr, + need_null_map_for_probe && ignore_null && + (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN || + JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || is_mark_join)); auto& mcol = mutable_block.mutable_columns(); @@ -313,6 +319,7 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_other_join_conjuncts( output_block->get_by_position(result_column_id).column = std::move(new_filter_column); } else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { + LOG(WARNING) << output_block->dump_data(); for (int i = 0; i < row_count; ++i) { visited[_build_indexs[i]] |= filter_column_ptr[i]; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org