This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 13652d6c6a0bdf6c76265509e989d90246fc5d28 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Tue Sep 12 18:16:20 2023 +0800 [fix](join) incorrect result of mark join (#24112) (#24238) --- be/src/vec/exec/join/vhash_join_node.cpp | 37 ++++++++++++++++++++-- .../nereids_syntax_p0/sub_query_correlated.out | 9 ++++++ .../nereids_syntax_p0/sub_query_correlated.groovy | 36 +++++++++++++++++++++ 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index b59027fc14..8b0dc5786b 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -519,7 +519,7 @@ Status HashJoinNode::close(RuntimeState* state) { bool HashJoinNode::need_more_input_data() const { return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) && !_probe_eos && - !_short_circuit_for_probe; + (!_short_circuit_for_probe || _is_mark_join); } void HashJoinNode::prepare_for_next() { @@ -530,10 +530,43 @@ void HashJoinNode::prepare_for_next() { Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { SCOPED_TIMER(_probe_timer); if (_short_circuit_for_probe) { + /// If `_short_circuit_for_probe` is true, this indicates no rows + /// match the join condition, and this is 'mark join', so we need to create a column as mark + /// with all rows set to 0. + if (_is_mark_join) { + auto block_rows = _probe_block.rows(); + if (block_rows == 0) { + *eos = _probe_eos; + return Status::OK(); + } + + Block temp_block; + //get probe side output column + for (int i = 0; i < _left_output_slot_flags.size(); ++i) { + if (_left_output_slot_flags[i]) { + temp_block.insert(_probe_block.get_by_position(i)); + } + } + auto mark_column = ColumnUInt8::create(block_rows, 0); + temp_block.insert({std::move(mark_column), std::make_shared<DataTypeUInt8>(), ""}); + + { + SCOPED_TIMER(_join_filter_timer); + RETURN_IF_ERROR( + VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns())); + } + + RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false)); + temp_block.clear(); + release_block_memory(_probe_block); + reached_limit(output_block, eos); + return Status::OK(); + } // If we use a short-circuit strategy, should return empty block directly. *eos = true; return Status::OK(); } + //TODO: this short circuit maybe could refactor, no need to check at here. if (_short_circuit_for_probe_and_additional_data) { // when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN @@ -720,7 +753,7 @@ Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_bloc Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (_short_circuit_for_probe) { + if (_short_circuit_for_probe && !_is_mark_join) { // If we use a short-circuit strategy, should return empty block directly. *eos = true; return Status::OK(); diff --git a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out index 06cb3ca839..15f4567365 100644 --- a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out +++ b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out @@ -192,6 +192,15 @@ 24 4 3 3 +-- !in_subquery_mark_with_order -- +1 \N +1 2 +1 3 +2 4 +2 5 +3 3 +3 4 + -- !exists_subquery_with_order -- 1 2 1 3 diff --git a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy index 8a1df18341..c7dcffb1be 100644 --- a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy +++ b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy @@ -42,6 +42,14 @@ suite ("sub_query_correlated") { DROP TABLE IF EXISTS `sub_query_correlated_subquery5` """ + sql """ + DROP TABLE IF EXISTS `sub_query_correlated_subquery6` + """ + + sql """ + DROP TABLE IF EXISTS `sub_query_correlated_subquery7` + """ + sql """ create table if not exists sub_query_correlated_subquery1 (k1 bigint, k2 bigint) @@ -82,6 +90,21 @@ suite ("sub_query_correlated") { properties('replication_num' = '1') """ + sql """ + create table if not exists sub_query_correlated_subquery6 + (k1 bigint, k2 bigint) + duplicate key(k1) + distributed by hash(k2) buckets 1 + properties('replication_num' = '1') + """ + + sql """ + create table if not exists sub_query_correlated_subquery7 + (k1 int, k2 varchar(128), k3 bigint, v1 bigint, v2 bigint) + distributed by hash(k2) buckets 1 + properties('replication_num' = '1'); + """ + sql """ insert into sub_query_correlated_subquery1 values (1,2), (1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4) """ @@ -103,6 +126,15 @@ suite ("sub_query_correlated") { insert into sub_query_correlated_subquery5 values (5,4), (5,2), (8,3), (5,4), (6,7), (8,9) """ + sql """ + insert into sub_query_correlated_subquery6 values (1,null),(null,1),(1,2), (null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null); + """ + + sql """ + insert into sub_query_correlated_subquery7 values (1,"abc",2,3,4), (1,"abcd",3,3,4), (2,"xyz",2,4,2), + (2,"uvw",3,4,2), (2,"uvw",3,4,2), (3,"abc",4,5,3), (3,"abc",4,5,3), (null,null,null,null,null); + """ + sql "SET enable_fallback_to_original_planner=false" //------------------Correlated----------------- @@ -261,6 +293,10 @@ suite ("sub_query_correlated") { select * from sub_query_correlated_subquery1 where sub_query_correlated_subquery1.k1 not in (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by k2); """ + order_qt_in_subquery_mark_with_order """ + select * from sub_query_correlated_subquery6 where sub_query_correlated_subquery6.k1 not in (select sub_query_correlated_subquery7.k3 from sub_query_correlated_subquery7 ) or k1 < 10; + """ + order_qt_exists_subquery_with_order """ select * from sub_query_correlated_subquery1 where exists (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by k2); """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org