This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 966865c7d45 [fix](join)Consider mark join when computing right_col_idx (#50720) 966865c7d45 is described below commit 966865c7d45f74ca755394a2b26d62a028f6855e Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Wed May 14 23:28:16 2025 +0800 [fix](join)Consider mark join when computing right_col_idx (#50720) ### What problem does this PR solve? If there is a mark join condition, then even in a right semi join, the columns from the left table involved in the mark join condition will still appear in the intermediate tuple. This PR also completes the missing handling logic for right semi joins with mark join condition. ``` ==2126730==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x503004ee02e0 at pc 0x55f4a7d3f354 bp 0x7f6b6b587e70 sp 0x7f6b6b587e68 READ of size 8 at 0x503004ee02e0 thread T1296 (brpc_light) #0 0x55f4a7d3f353 in std::__shared_ptr<doris::vectorized::IDataType const, (__gnu_cxx::_Lock_policy)2>::__shared_ptr(std::__shared_ptr<doris::vectorized::IDataType const, (__gnu_cxx::_Lock_policy)2> const&) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/shared_ptr_base.h:1522:7 #1 0x55f4a7d3f01e in std::shared_ptr<doris::vectorized::IDataType const>::shared_ptr(std::shared_ptr<doris::vectorized::IDataType const> const&) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/shared_ptr.h:204:7 #2 0x55f4f8aa231c in doris::pipeline::HashJoinProbeOperatorX::prepare(doris::RuntimeState*) /root/doris/be/src/pipeline/exec/hashjoin_probe_operator.cpp:577:33 #3 0x55f4fb1e4a28 in doris::pipeline::Pipeline::prepare(doris::RuntimeState*) /root/doris/be/src/pipeline/pipeline.cpp:89:5 #4 0x55f4fb089d3f in doris::pipeline::PipelineFragmentContext::prepare(doris::TPipelineFragmentParams const&, doris::ThreadPool*) /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:352:9 #5 0x55f4ad615a5c in doris::FragmentMgr::exec_plan_fragment(doris::TPipelineFragmentParams const&, doris::QuerySource, std::function<void (doris::RuntimeState*, doris::Status*)> const&, doris::TPipelineFragmentParamsList const&) /root/doris/be/src/runtime/fragment_mgr.cpp:855:9 #6 0x55f4ad613ca6 in doris::FragmentMgr::exec_plan_fragment(doris::TPipelineFragmentParams const&, doris::QuerySource, doris::TPipelineFragmentParamsList const&) /root/doris/be/src/runtime/fragment_mgr.cpp:634:16 #7 0x55f4ae2e867c in doris::PInternalService::_exec_plan_fragment_impl(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>> const&, doris::PFragmentRequestVersion, bool, std::function<void (doris::RuntimeState*, doris::Status*)> const&) /root/doris/be/src/service/internal_service.cpp:613:17 #8 0x55f4ae2e4b67 in doris::PInternalService::_exec_plan_fragment_in_pthread(google::protobuf::RpcController*, doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*, google::protobuf::Closure*) /root/doris/be/src/service/internal_service.cpp:343:14 #9 0x55f4ae31d4e1 in doris::PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController*, doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*, google::protobuf::Closure*)::$_0::operator()() const /root/doris/be/src/service/internal_service.cpp:367:9 #10 0x55f4ae31d33e in void std::__invoke_impl<void, doris::PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController*, doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*, google::protobuf::Closure*)::$_0&>(std::__invoke_other, doris::PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController*, doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*, google::protobuf::Closure*)::$_0&) /root/ldb_toolchain_robi [...] #11 0x55f4ae31d27e in std::enable_if<is_invocable_r_v<void, doris::PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController*, doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*, google::protobuf::Closure*)::$_0&>, void>::type std::__invoke_r<void, doris::PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController*, doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*, google::protobuf::Closure*)::$_0&>(do [...] #12 0x55f4ae31cf55 in std::_Function_handler<void (), doris::PInternalService::exec_plan_fragment_prepare(google::protobuf::RpcController*, doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*, google::protobuf::Closure*)::$_0>::_M_invoke(std::_Any_data const&) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290:9 #13 0x55f4a74175af in std::function<void ()>::operator()() const /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591:9 #14 0x55f4ae3c21c4 in doris::WorkThreadPool<false>::work_thread(int) /root/doris/be/src/util/work_thread_pool.hpp:158:17 #15 0x55f4ae3c4cc8 in void std::__invoke_impl<void, void (doris::WorkThreadPool<false>::* const&)(int), doris::WorkThreadPool<false>*&, int&>(std::__invoke_memfun_deref, void (doris::WorkThreadPool<false>::* const&)(int), doris::WorkThreadPool<false>*&, int&) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:74:14 #16 0x55f4ae3c4a92 in std::__invoke_result<void (doris::WorkThreadPool<false>::* const&)(int), doris::WorkThreadPool<false>*&, int&>::type std::__invoke<void (doris::WorkThreadPool<false>::* const&)(int), doris::WorkThreadPool<false>*&, int&>(void (doris::WorkThreadPool<false>::* const&)(int), doris::WorkThreadPool<false>*&, int&) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:96:14 #17 0x55f4ae3c49f8 in decltype(std::__invoke((*this)._M_pmf, std::forward<doris::WorkThreadPool<false>*&>(fp), std::forward<int&>(fp))) std::_Mem_fn_base<void (doris::WorkThreadPool<false>::*)(int), true>::operator()<doris::WorkThreadPool<false>*&, int&>(doris::WorkThreadPool<false>*&, int&) const /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:170:11 #18 0x55f4ae3c4942 in void std::__invoke_impl<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)>&, doris::WorkThreadPool<false>*&, int&>(std::__invoke_other, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)>&, doris::WorkThreadPool<false>*&, int&) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61:14 #19 0x55f4ae3c4752 in std::enable_if<is_invocable_r_v<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)>&, doris::WorkThreadPool<false>*&, int&>, void>::type std::__invoke_r<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)>&, doris::WorkThreadPool<false>*&, int&>(std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)>&, doris::WorkThreadPool<false>*&, int&) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/inv [...] #20 0x55f4ae3c4664 in void std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>::__call<void, 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:654:11 #21 0x55f4ae3c4395 in void std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>::operator()<>() /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:713:17 #22 0x55f4ae3c428e in void std::__invoke_impl<void, std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>(std::__invoke_other, std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>&&) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61:14 #23 0x55f4ae3c41ce in std::__invoke_result<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>::type std::__invoke<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>(std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>&&) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu [...] #24 0x55f4ae3c417b in void std::thread::_Invoker<std::tuple<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>>::_M_invoke<0ul>(std::_Index_tuple<0ul>) /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_thread.h:292:13 #25 0x55f4ae3c40f6 in std::thread::_Invoker<std::tuple<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>>::operator()() /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_thread.h:299:11 #26 0x55f4ae3c3f34 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>>>::_M_run() /root/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_thread.h:244:13 #27 0x55f4fea05d2e in execute_native_thread_routine pthread_atfork.c #28 0x55f4a7162e0a in asan_thread_start(void*) crtstuff.c #29 0x7f7149c421c9 in start_thread (/lib64/libpthread.so.0+0x81c9) (BuildId: 7c4add5c7a885e6ff4ce17867d6a2286e4420eec) #30 0x7f714a6318d2 in clone (/lib64/libc.so.6+0x398d2) (BuildId: 4ee3325955e3b55b6805f33959b7cb77745ad625) ``` --- be/src/pipeline/exec/hashjoin_probe_operator.cpp | 29 ++- be/src/pipeline/exec/hashjoin_probe_operator.h | 3 + .../pipeline/exec/join/process_hash_table_probe.h | 10 +- .../exec/join/process_hash_table_probe_impl.h | 134 +++++++++---- be/src/vec/common/hash_table/join_hash_table.h | 11 +- .../operator/hashjoin_probe_operator_test.cpp | 28 +++ .../data/query_p0/join/mark_join/mark_join.out | Bin 239 -> 1009 bytes .../query_p0/join/mark_join/mark_join.groovy | 215 +++++++++++++++++++++ 8 files changed, 381 insertions(+), 49 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index a5d195a2d8c..476503f187e 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -568,14 +568,35 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { } } - const size_t right_col_idx = - (_is_right_semi_anti && !_have_other_join_conjunct) ? 0 : _left_table_data_types.size(); + _right_col_idx = (_is_right_semi_anti && !_have_other_join_conjunct && + (!_is_mark_join || _mark_join_conjuncts.empty())) + ? 0 + : _left_table_data_types.size(); + size_t idx = 0; for (const auto* slot : slots_to_check) { auto data_type = slot->get_data_type_ptr(); - const auto slot_on_left = idx < right_col_idx; + const auto slot_on_left = idx < _right_col_idx; + + if (slot_on_left) { + if (idx >= _left_table_data_types.size()) { + return Status::InternalError( + "Join node(id={}, OP={}) intermediate slot({}, #{})'s on left table " + "idx out bound of _left_table_data_types: {} vs {}", + _node_id, _join_op, slot->col_name(), slot->id(), idx, + _left_table_data_types.size()); + } + } else if (idx - _right_col_idx >= _right_table_data_types.size()) { + return Status::InternalError( + "Join node(id={}, OP={}) intermediate slot({}, #{})'s on right table " + "idx out bound of _right_table_data_types: {} vs {}(idx = {}, _right_col_idx = " + "{})", + _node_id, _join_op, slot->col_name(), slot->id(), idx - _right_col_idx, + _right_table_data_types.size(), idx, _right_col_idx); + } + auto target_data_type = slot_on_left ? _left_table_data_types[idx] - : _right_table_data_types[idx - right_col_idx]; + : _right_table_data_types[idx - _right_col_idx]; ++idx; if (data_type->equals(*target_data_type)) { continue; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 51758d8b8fb..7cbe4434112 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -191,6 +191,9 @@ private: std::set<int> _should_not_lazy_materialized_column_ids; std::vector<std::string> _right_table_column_names; const std::vector<TExpr> _partition_exprs; + + // Index of column(slot) from right table in the `_intermediate_row_desc`. + size_t _right_col_idx; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h index 4fde7ed5fea..535ca74997d 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -22,6 +22,7 @@ #include "vec/columns/column.h" #include "vec/columns/columns_number.h" #include "vec/common/arena.h" +#include "vec/common/custom_allocator.h" namespace doris { namespace vectorized { @@ -119,8 +120,15 @@ struct ProcessHashTableProbe { RuntimeProfile::Counter* _probe_side_output_timer = nullptr; RuntimeProfile::Counter* _finish_probe_phase_timer = nullptr; - size_t _right_col_idx; + // See `HashJoinProbeOperatorX::_right_col_idx` + const size_t _right_col_idx; + size_t _right_col_len; + + // For right semi with mark join conjunct, we need to store the mark join flags + // in the hash table. + // -1 means null, 0 means false, 1 means true + DorisVector<int8_t> mark_join_flags; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index df7ad9456bb..62fa5505d81 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -70,9 +70,7 @@ ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState _build_side_output_timer(parent->_build_side_output_timer), _probe_side_output_timer(parent->_probe_side_output_timer), _finish_probe_phase_timer(parent->_finish_probe_phase_timer), - _right_col_idx((_parent_operator->_is_right_semi_anti && !_have_other_join_conjunct) - ? 0 - : _parent_operator->_left_table_data_types.size()), + _right_col_idx(_parent_operator->_right_col_idx), _right_col_len(_parent_operator->_right_table_data_types.size()) {} template <int JoinOpType> @@ -272,7 +270,7 @@ Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx, build_side_output_column(mcol, is_mark_join); - if (_have_other_join_conjunct || + if (_have_other_join_conjunct || !_parent->_mark_join_conjuncts.empty() || (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) { probe_side_output_column(mcol); } @@ -281,7 +279,7 @@ Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx, DCHECK_EQ(current_offset, output_block->rows()); COUNTER_UPDATE(_parent->_intermediate_rows_counter, current_offset); - if (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { + if (is_mark_join) { bool ignore_null_map = (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) && @@ -409,15 +407,20 @@ Status ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter( template <int JoinOpType> Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block* output_block, const uint8_t* null_map) { - DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::LEFT_SEMI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN); + if (JoinOpType != TJoinOp::LEFT_ANTI_JOIN && JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && + JoinOpType != TJoinOp::LEFT_SEMI_JOIN && JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN && + JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) { + return Status::InternalError("join type {} is not supported", JoinOpType); + } constexpr bool is_anti_join = JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN; constexpr bool is_null_aware_join = JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; + constexpr bool is_right_half_join = + JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || JoinOpType == TJoinOp::RIGHT_ANTI_JOIN; + const auto row_count = output_block->rows(); if (!row_count) { return Status::OK(); @@ -488,37 +491,77 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo } } + if constexpr (is_right_half_join) { + if (mark_join_flags.empty() && _build_block != nullptr) { + mark_join_flags.resize(_build_block->rows(), 0); + } + } + auto filter_column = vectorized::ColumnUInt8::create(row_count, 0); auto* __restrict filter_map = filter_column->get_data().data(); for (size_t i = 0; i != row_count; ++i) { - if (_parent->_last_probe_match == _probe_indexs.get_element(i)) { - continue; - } - if (_build_indexs.get_element(i) == 0) { - bool has_null_mark_value = - _parent->_last_probe_null_mark == _probe_indexs.get_element(i); - filter_map[i] = true; - mark_filter_data[i] = false; - mark_null_map[i] |= has_null_mark_value; - } else if (mark_null_map[i]) { - _parent->_last_probe_null_mark = _probe_indexs.get_element(i); - } else if (mark_filter_data[i]) { - filter_map[i] = true; - _parent->_last_probe_match = _probe_indexs.get_element(i); + if constexpr (is_right_half_join) { + const auto& build_index = _build_indexs.get_element(i); + if (build_index == 0) { + continue; + } + + if (mark_join_flags[build_index] == 1) { + continue; + } + + if (mark_null_map[i]) { + mark_join_flags[build_index] = -1; + } else if (mark_filter_data[i]) { + mark_join_flags[build_index] = 1; + } + } else { + if (_parent->_last_probe_match == _probe_indexs.get_element(i)) { + continue; + } + if (_build_indexs.get_element(i) == 0) { + bool has_null_mark_value = + _parent->_last_probe_null_mark == _probe_indexs.get_element(i); + filter_map[i] = true; + mark_filter_data[i] = false; + mark_null_map[i] |= has_null_mark_value; + } else if (mark_null_map[i]) { + _parent->_last_probe_null_mark = _probe_indexs.get_element(i); + } else if (mark_filter_data[i]) { + filter_map[i] = true; + _parent->_last_probe_match = _probe_indexs.get_element(i); + } } } - if constexpr (is_anti_join) { - // flip the mark column - for (size_t i = 0; i != row_count; ++i) { - mark_filter_data[i] ^= 1; // not null/ null + if constexpr (is_right_half_join) { + if constexpr (is_anti_join) { + // flip the mark column + for (size_t i = 0; i != row_count; ++i) { + if (mark_join_flags[i] == -1) { + // -1 means null. + continue; + } + + mark_join_flags[i] ^= 1; + } + } + // For right semi/anti join, no rows will be output in probe phase. + output_block->clear_column_data(); + return Status::OK(); + } else { + if constexpr (is_anti_join) { + // flip the mark column + for (size_t i = 0; i != row_count; ++i) { + mark_filter_data[i] ^= 1; // not null/ null + } } - } - auto result_column_id = output_block->columns(); - output_block->insert( - {std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(), ""}); - return finalize_block_with_filter(output_block, result_column_id, result_column_id); + auto result_column_id = output_block->columns(); + output_block->insert( + {std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(), ""}); + return finalize_block_with_filter(output_block, result_column_id, result_column_id); + } } template <int JoinOpType> @@ -675,8 +718,31 @@ Status ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_tab } } + if constexpr (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + if (is_mark_join) { + if (mark_join_flags.empty() && _build_block != nullptr) { + mark_join_flags.resize(_build_block->rows(), 0); + } + + // mark column is nullable + auto* mark_column = assert_cast<vectorized::ColumnNullable*>( + mcol[_parent->_mark_column_id].get()); + mark_column->resize(block_size); + auto* null_map = mark_column->get_null_map_data().data(); + auto* data = assert_cast<vectorized::ColumnUInt8&>(mark_column->get_nested_column()) + .get_data() + .data(); + for (size_t i = 0; i != block_size; ++i) { + const auto build_index = _build_indexs.get_element(i); + null_map[i] = mark_join_flags[build_index] == -1; + data[i] = mark_join_flags[build_index] == 1; + } + } + } + // just resize the left table column in case with other conjunct to make block size is not zero - if (_parent_operator->_is_right_semi_anti && _have_other_join_conjunct) { + if (_parent_operator->_is_right_semi_anti && _right_col_idx != 0) { for (int i = 0; i < _right_col_idx; ++i) { mcol[i]->resize(block_size); } diff --git a/be/src/vec/common/hash_table/join_hash_table.h b/be/src/vec/common/hash_table/join_hash_table.h index 61dc8b1e182..86ce9854a91 100644 --- a/be/src/vec/common/hash_table/join_hash_table.h +++ b/be/src/vec/common/hash_table/join_hash_table.h @@ -106,7 +106,7 @@ public: keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs); } - if (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { + if (is_mark_join) { bool is_null_aware_join = JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN; bool is_left_half_join = @@ -292,15 +292,6 @@ private: auto do_the_probe = [&]() { while (build_idx && matched_cnt < batch_size) { - if constexpr (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { - if (!visited[build_idx] && keys[probe_idx] == build_keys[build_idx]) { - probe_idxs[matched_cnt] = probe_idx; - build_idxs[matched_cnt] = build_idx; - matched_cnt++; - } - } - if (keys[probe_idx] == build_keys[build_idx]) { build_idxs[matched_cnt] = build_idx; probe_idxs[matched_cnt] = probe_idx; diff --git a/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp index ebc5795dd66..bf0e2902344 100644 --- a/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp +++ b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp @@ -798,6 +798,34 @@ TEST_F(HashJoinProbeOperatorTest, RightSemiJoin) { check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"}); } +TEST_F(HashJoinProbeOperatorTest, RightSemiJoinMarkJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({.join_op_type = TJoinOp::RIGHT_SEMI_JOIN, + .is_mark_join = true, + .mark_join_conjuncts_size = 1}, + {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, {false, true}, + build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 5); + + check_column_values(*sorted_block.get_by_position(2).column, {1, 2, 3, 4, 5}); + check_column_values(*sorted_block.get_by_position(3).column, {Null(), "b", "c", "d", Null()}); + check_column_values(*sorted_block.get_by_position(4).column, {0, Null(), 1, 1, 0}); +} + TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoin) { auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( diff --git a/regression-test/data/query_p0/join/mark_join/mark_join.out b/regression-test/data/query_p0/join/mark_join/mark_join.out index ed3575d0e14..f4ac9204b47 100644 Binary files a/regression-test/data/query_p0/join/mark_join/mark_join.out and b/regression-test/data/query_p0/join/mark_join/mark_join.out differ diff --git a/regression-test/suites/query_p0/join/mark_join/mark_join.groovy b/regression-test/suites/query_p0/join/mark_join/mark_join.groovy index 9759a0e9b4c..6b0a9d938c2 100644 --- a/regression-test/suites/query_p0/join/mark_join/mark_join.groovy +++ b/regression-test/suites/query_p0/join/mark_join/mark_join.groovy @@ -61,4 +61,219 @@ suite("mark_join") { qt_test """ select * from t1 where t1.k1 not in (select t2.k3 from t2 where t2.k2 = t1.k2) or k1 < 10 order by k1, k2; """ + + + sql "drop table if exists tbl1;" + sql "drop table if exists tbl2;" + sql "drop table if exists tbl3;" + + sql """ + CREATE TABLE `tbl1` ( + `unit_name` varchar(1080) NULL, + `cur_unit_name` varchar(1080) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`unit_name`) + DISTRIBUTED BY RANDOM BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + CREATE TABLE `tbl2` ( + `org_code` varchar(150) NOT NULL , + `org_name` varchar(300) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`org_code`) + DISTRIBUTED BY HASH(`org_code`) BUCKETS 4 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + CREATE TABLE `tbl3` ( + `id` bigint NOT NULL, + `acntm_name` varchar(500) NULL , + `vendor_name` varchar(500) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + insert into tbl1 (unit_name, cur_unit_name) values + ('v1', 'o1'), + ('v2', 'o2'), + ('v3', 'o3'), + ('v4', 'o4'), + ('v5', 'o5'), + (null, 'o1'), + ('v1', 'o1'), + ('v2', 'o2'), + ('v3', 'o3'), + ('v4', 'o4'), + ('v5', 'o5'), + (null, 'o1'), + (null, 'o2'), + (null, 'o3'), + (null, 'o4'), + (null, 'o5'), + ('v1', 'o1'), + ('v2', 'o2'), + ('v3', 'o3'), + ('v4', 'o4'), + ('v5', 'o5'); + """ + + sql """ + insert into tbl2(org_code, org_name) values + ('v1', 'o1'), + ('v2', 'o2'), + ('v3', 'o3'), + ('v4', 'o4'), + ('v5', 'o5'), + ('v1', null), + ('v2', null), + ('v3', null), + ('v4', null), + ('v5', null); + """ + + sql """ + insert into tbl3 (id, vendor_name, acntm_name) + values(1, 'o1', 'v1'), + (2, 'o2', 'v2'), + (3, 'o3', 'v3'), + (4, 'o4', 'v4'), + (5, 'o5', 'v5'), + (6, null, 'v1'), + (7, null, 'v2'), + (8, null, 'v3'), + (9, null, 'v4'), + (10, null, 'v5'); + """ + + sql " analyze table tbl1 with sync;" + sql " analyze table tbl2 with sync;" + sql " analyze table tbl3 with sync;" + + sql "set disable_join_reorder=0;" + qt_test_right_semi_mark_join """ + select + tbl3.id, + tbl3.acntm_name, + tbl3.vendor_name, + tbl3.vendor_name in ( + select + tbl1.unit_name + from + tbl2 + join tbl1 on tbl1.cur_unit_name = tbl2.org_name + where + tbl2.org_code = tbl3.acntm_name + ) v1, + tbl3.vendor_name not in ( + select + tbl1.unit_name + from + tbl2 + join tbl1 on tbl1.cur_unit_name = tbl2.org_name + where + tbl2.org_code = tbl3.acntm_name + ) v2 + from + tbl3 order by 1,2,3,4,5; + """ + + sql "set disable_join_reorder=1;" + qt_test_right_semi_mark_join_2 """ + select + tbl3.id, + tbl3.acntm_name, + tbl3.vendor_name, + tbl3.vendor_name in ( + select + tbl1.unit_name + from + tbl2 + join tbl1 on tbl1.cur_unit_name = tbl2.org_name + where + tbl2.org_code = tbl3.acntm_name + ) v1, + tbl3.vendor_name not in ( + select + tbl1.unit_name + from + tbl2 + join tbl1 on tbl1.cur_unit_name = tbl2.org_name + where + tbl2.org_code = tbl3.acntm_name + ) v2 + from + tbl3 order by 1,2,3,4,5; + """ + + sql "set disable_join_reorder=0;" + qt_test_right_semi_mark_join_no_null """ + select + tbl3.id, + tbl3.acntm_name, + tbl3.vendor_name, + tbl3.vendor_name in ( + select + tbl1.unit_name + from + tbl2 + join tbl1 on tbl1.cur_unit_name = tbl2.org_name + where + tbl2.org_code = tbl3.acntm_name + and tbl1.unit_name is not null + ) v1, + tbl3.vendor_name not in ( + select + tbl1.unit_name + from + tbl2 + join tbl1 on tbl1.cur_unit_name = tbl2.org_name + where + tbl2.org_code = tbl3.acntm_name + and tbl1.unit_name is not null + ) v2 + from + tbl3 order by 1,2,3,4,5; + """ + + sql "set disable_join_reorder=1;" + qt_test_right_semi_mark_join_no_null_2 """ + select + tbl3.id, + tbl3.acntm_name, + tbl3.vendor_name, + tbl3.vendor_name in ( + select + tbl1.unit_name + from + tbl2 + join tbl1 on tbl1.cur_unit_name = tbl2.org_name + where + tbl2.org_code = tbl3.acntm_name + and tbl1.unit_name is not null + ) v1, + tbl3.vendor_name not in ( + select + tbl1.unit_name + from + tbl2 + join tbl1 on tbl1.cur_unit_name = tbl2.org_name + where + tbl2.org_code = tbl3.acntm_name + and tbl1.unit_name is not null + ) v2 + from + tbl3 order by 1,2,3,4,5; + """ } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org