This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7513c82431 [NLJoin](conjuncts) separate join conjuncts and general conjuncts (#14608) 7513c82431 is described below commit 7513c82431b4aa2c6d8ab45ddfb6874e3c4fa7fd Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Nov 29 08:55:54 2022 +0800 [NLJoin](conjuncts) separate join conjuncts and general conjuncts (#14608) --- be/src/util/runtime_profile.h | 2 +- be/src/vec/exec/join/vhash_join_node.cpp | 65 +++++++--------- be/src/vec/exec/join/vhash_join_node.h | 15 +--- be/src/vec/exec/join/vjoin_node_base.cpp | 12 +++ be/src/vec/exec/join/vjoin_node_base.h | 9 +++ be/src/vec/exec/join/vnested_loop_join_node.cpp | 87 ++++++++++++++++++++-- be/src/vec/exec/join/vnested_loop_join_node.h | 6 ++ .../doris/analysis/BitmapFilterPredicate.java | 2 +- .../glue/translator/PhysicalPlanTranslator.java | 5 +- .../org/apache/doris/planner/HashJoinNode.java | 34 ++------- .../org/apache/doris/planner/JoinNodeBase.java | 32 +++++++- .../apache/doris/planner/NestedLoopJoinNode.java | 64 +++++++++------- .../apache/doris/planner/SingleNodePlanner.java | 7 +- gensrc/thrift/PlanNodes.thrift | 2 + .../query_p0/join/test_nestedloop_outer_join.out | 12 +++ .../join/test_nestedloop_outer_join.groovy | 16 ++++ 16 files changed, 249 insertions(+), 121 deletions(-) diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 55b4b2c3d2..62f0365afc 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -539,7 +539,7 @@ public: if (counter == nullptr) { return; } - DCHECK(counter->type() == TUnit::TIME_NS); + DCHECK_EQ(counter->type(), TUnit::TIME_NS); _sw.start(); } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 2cce0fdff5..35b83e2a8d 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -343,11 +343,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { init_output_slots_flags(child(0)->row_desc().tuple_descriptors(), _left_output_slot_flags); init_output_slots_flags(child(1)->row_desc().tuple_descriptors(), _right_output_slot_flags); - // only use in outer join as the bool column to mark for function of `tuple_is_null` - if (_is_outer_join) { - _tuple_is_null_left_flag_column = ColumnUInt8::create(); - _tuple_is_null_right_flag_column = ColumnUInt8::create(); - } return Status::OK(); } @@ -565,7 +560,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo return Status::OK(); } - _add_tuple_is_null_column(&temp_block); + if (_is_outer_join) { + _add_tuple_is_null_column(&temp_block); + } { SCOPED_TIMER(_join_filter_timer); RETURN_IF_ERROR( @@ -578,6 +575,30 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo return st; } +void HashJoinNode::_add_tuple_is_null_column(Block* block) { + DCHECK(_is_outer_join); + auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); + auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); + auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0); + auto& right_null_map = reinterpret_cast<ColumnUInt8&>(*p1); + auto left_size = left_null_map.size(); + auto right_size = right_null_map.size(); + + if (left_size == 0) { + DCHECK_EQ(right_size, block->rows()); + left_null_map.get_data().resize_fill(right_size, 0); + } + if (right_size == 0) { + DCHECK_EQ(left_size, block->rows()); + right_null_map.get_data().resize_fill(left_size, 0); + } + + block->insert( + {std::move(p0), std::make_shared<vectorized::DataTypeUInt8>(), "left_tuples_is_null"}); + block->insert( + {std::move(p1), std::make_shared<vectorized::DataTypeUInt8>(), "right_tuples_is_null"}); +} + void HashJoinNode::_prepare_probe_block() { // clear_column_data of _probe_block if (!_probe_column_disguise_null.empty()) { @@ -1051,38 +1072,6 @@ std::vector<uint16_t> HashJoinNode::_convert_block_to_null(Block& block) { return results; } -void HashJoinNode::_add_tuple_is_null_column(Block* block) { - if (_is_outer_join) { - auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); - auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); - auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0); - auto& right_null_map = reinterpret_cast<ColumnUInt8&>(*p1); - auto left_size = left_null_map.size(); - auto right_size = right_null_map.size(); - - if (left_size == 0) { - DCHECK_EQ(right_size, block->rows()); - left_null_map.get_data().resize_fill(right_size, 0); - } - if (right_size == 0) { - DCHECK_EQ(left_size, block->rows()); - right_null_map.get_data().resize_fill(left_size, 0); - } - - block->insert({std::move(p0), std::make_shared<vectorized::DataTypeUInt8>(), - "left_tuples_is_null"}); - block->insert({std::move(p1), std::make_shared<vectorized::DataTypeUInt8>(), - "right_tuples_is_null"}); - } -} - -void HashJoinNode::_reset_tuple_is_null_column() { - if (_is_outer_join) { - reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_left_flag_column).clear(); - reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_right_flag_column).clear(); - } -} - HashJoinNode::~HashJoinNode() { if (_shared_hashtable_controller && _should_build_hash_table) { _shared_hashtable_controller->signal(id()); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 1a35bb2652..a41d3c2f7b 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -251,8 +251,6 @@ private: RuntimeProfile::Counter* _build_side_compute_hash_timer; RuntimeProfile::Counter* _build_side_merge_block_timer; - RuntimeProfile::Counter* _join_filter_timer; - RuntimeProfile* _build_phase_profile; int64_t _mem_used; @@ -289,12 +287,8 @@ private: std::vector<bool> _left_output_slot_flags; std::vector<bool> _right_output_slot_flags; - MutableColumnPtr _tuple_is_null_left_flag_column; - MutableColumnPtr _tuple_is_null_right_flag_column; - SharedHashTableContextPtr _shared_hash_table_context = nullptr; -private: Status _materialize_build_side(RuntimeState* state) override; Status _process_build_block(RuntimeState* state, Block& block, uint8_t offset); @@ -317,16 +311,13 @@ private: void _prepare_probe_block(); - // add tuple is null flag column to Block for filter conjunct and output expr - void _add_tuple_is_null_column(Block* block); - - // reset the tuple is null flag column for the next call - void _reset_tuple_is_null_column(); - static std::vector<uint16_t> _convert_block_to_null(Block& block); void _release_mem(); + // add tuple is null flag column to Block for filter conjunct and output expr + void _add_tuple_is_null_column(Block* block) override; + template <class HashTableContext> friend struct ProcessHashTableBuild; diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 731b383340..d5d5280f57 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -144,6 +144,11 @@ Status VJoinNodeBase::init(const TPlanNode& tnode, RuntimeState* state) { _output_expr_ctxs.push_back(ctx); } } + // only use in outer join as the bool column to mark for function of `tuple_is_null` + if (_is_outer_join) { + _tuple_is_null_left_flag_column = ColumnUInt8::create(); + _tuple_is_null_right_flag_column = ColumnUInt8::create(); + } return ExecNode::init(tnode, state); } @@ -173,6 +178,13 @@ Status VJoinNodeBase::open(RuntimeState* state) { return status; } +void VJoinNodeBase::_reset_tuple_is_null_column() { + if (_is_outer_join) { + reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_left_flag_column).clear(); + reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_right_flag_column).clear(); + } +} + void VJoinNodeBase::_probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJoinNodeBase::_hash_table_build_thread"); SCOPED_ATTACH_TASK(state); diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 81869c373d..f2bdc6eced 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -68,6 +68,11 @@ protected: // Initialize the join operation. void _init_join_op(); + virtual void _add_tuple_is_null_column(Block* block) = 0; + + // reset the tuple is null flag column for the next call + void _reset_tuple_is_null_column(); + // Materialize build relation. For HashJoin, it will build a hash table while a list of build blocks for NLJoin. virtual Status _materialize_build_side(RuntimeState* state) = 0; @@ -97,12 +102,16 @@ protected: Block _join_block; + MutableColumnPtr _tuple_is_null_left_flag_column; + MutableColumnPtr _tuple_is_null_right_flag_column; + RuntimeProfile::Counter* _build_timer; RuntimeProfile::Counter* _probe_timer; RuntimeProfile::Counter* _build_rows_counter; RuntimeProfile::Counter* _probe_rows_counter; RuntimeProfile::Counter* _push_down_timer; RuntimeProfile::Counter* _push_compute_timer; + RuntimeProfile::Counter* _join_filter_timer; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 356a9a7a5a..246122d5df 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -83,6 +83,12 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _is_output_left_side_only = tnode.nested_loop_join_node.is_output_left_side_only; } + if (tnode.nested_loop_join_node.__isset.vjoin_conjunct) { + _vjoin_conjunct_ptr.reset(new VExprContext*); + RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, tnode.nested_loop_join_node.vjoin_conjunct, + _vjoin_conjunct_ptr.get())); + } + std::vector<TExpr> filter_src_exprs; for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr); @@ -106,6 +112,7 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) { _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime"); _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime"); _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); + _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer"); // pre-compute the tuple index of build tuples in the output row int num_build_tuples = child(1)->row_desc().tuple_descriptors().size(); @@ -116,6 +123,9 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) { RETURN_IF_INVALID_TUPLE_IDX(build_tuple_desc->id(), tuple_idx); } + if (_vjoin_conjunct_ptr) { + RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->prepare(state, *_intermediate_row_desc)); + } _num_probe_side_columns = child(0)->row_desc().num_materialized_slots(); _num_build_side_columns = child(1)->row_desc().num_materialized_slots(); RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); @@ -132,6 +142,7 @@ Status VNestedLoopJoinNode::close(RuntimeState* state) { } START_AND_SCOPE_SPAN(state->get_tracer(), span, "VNestedLoopJoinNode::close"); VExpr::close(_filter_src_expr_ctxs, state); + if (_vjoin_conjunct_ptr) (*_vjoin_conjunct_ptr)->close(state); _release_mem(); return VJoinNodeBase::close(state); @@ -191,9 +202,9 @@ Status VNestedLoopJoinNode::get_left_side(RuntimeState* state, Block* block) { Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VNestedLoopJoinNode::get_next"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_TIMER(_probe_timer); RETURN_IF_CANCELLED(state); - SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); if (_is_output_left_side_only) { @@ -249,6 +260,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo Status status = _do_filtering_and_update_visited_flags<set_build_side_flag, set_probe_side_flag>( &tmp_block, offset_stack, !_is_left_semi_anti); + _update_tuple_is_null_column(&tmp_block); if (!status.OK()) { return status; } @@ -277,6 +289,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo Status status = _do_filtering_and_update_visited_flags<set_build_side_flag, set_probe_side_flag>( &tmp_block, offset_stack, !_is_right_semi_anti); + _update_tuple_is_null_column(&tmp_block); mutable_join_block = MutableBlock(std::move(tmp_block)); if (!status.OK()) { return status; @@ -300,7 +313,16 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo : _matched_rows_done; Block tmp_block = mutable_join_block.to_block(0); + if (_is_outer_join) { + _add_tuple_is_null_column(&tmp_block); + } + { + SCOPED_TIMER(_join_filter_timer); + RETURN_IF_ERROR( + VExprContext::filter_block(_vconjunct_ctx_ptr, &tmp_block, tmp_block.columns())); + } RETURN_IF_ERROR(_build_output_block(&tmp_block, block)); + _reset_tuple_is_null_column(); reached_limit(block, eos); return Status::OK(); } @@ -344,8 +366,38 @@ void VNestedLoopJoinNode::_process_left_child_block(MutableColumns& dst_columns, } } +void VNestedLoopJoinNode::_update_tuple_is_null_column(Block* block) { + if (_is_outer_join) { + auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); + auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); + auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0); + auto& right_null_map = reinterpret_cast<ColumnUInt8&>(*p1); + auto left_size = left_null_map.size(); + auto right_size = right_null_map.size(); + + if (left_size < block->rows()) { + left_null_map.get_data().resize_fill(block->rows(), 0); + } + if (right_size < block->rows()) { + right_null_map.get_data().resize_fill(block->rows(), 0); + } + } +} + +void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) { + DCHECK(_is_outer_join); + auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); + auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); + block->insert( + {std::move(p0), std::make_shared<vectorized::DataTypeUInt8>(), "left_tuples_is_null"}); + block->insert( + {std::move(p1), std::make_shared<vectorized::DataTypeUInt8>(), "right_tuples_is_null"}); +} + template <bool BuildSide, bool IsSemi> void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, size_t batch_size) { + DCHECK_GT(dst_columns.size(), 0); + auto pre_size = dst_columns[0]->size(); if constexpr (BuildSide) { auto build_block_sz = _build_blocks.size(); size_t i = _output_null_idx_build_side; @@ -378,6 +430,15 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s dst_columns[j]->insert_many_defaults(selector_idx); } + if (_is_outer_join) { + reinterpret_cast<ColumnUInt8*>(_tuple_is_null_left_flag_column.get()) + ->get_data() + .resize_fill(pre_size + selector_idx, 1); + reinterpret_cast<ColumnUInt8*>(_tuple_is_null_right_flag_column.get()) + ->get_data() + .resize_fill(pre_size + selector_idx, 0); + } + for (size_t j = 0; j < _num_build_side_columns; ++j) { auto src_column = cur_block.get_by_position(j); if (!src_column.column->is_nullable() && @@ -403,6 +464,7 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s i++; break; } + pre_size = dst_columns[0]->size(); } _output_null_idx_build_side = i; } else { @@ -437,6 +499,14 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s for (size_t i = 0; i < _num_build_side_columns; ++i) { dst_columns[_num_probe_side_columns + i]->insert_default(); } + if (_is_outer_join) { + reinterpret_cast<ColumnUInt8*>(_tuple_is_null_left_flag_column.get()) + ->get_data() + .resize_fill(pre_size + 1, 0); + reinterpret_cast<ColumnUInt8*>(_tuple_is_null_right_flag_column.get()) + ->get_data() + .resize_fill(pre_size + 1, 1); + } } } @@ -457,10 +527,10 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags( size_t build_block_idx = _current_build_pos == 0 ? _build_blocks.size() - 1 : _current_build_pos - 1; size_t processed_blocks_num = offset_stack.size(); - if (LIKELY(_vconjunct_ctx_ptr != nullptr && block->rows() > 0)) { - DCHECK((*_vconjunct_ctx_ptr) != nullptr); + if (LIKELY(_vjoin_conjunct_ptr != nullptr && block->rows() > 0)) { + DCHECK((*_vjoin_conjunct_ptr) != nullptr); int result_column_id = -1; - RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->execute(block, &result_column_id)); + RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->execute(block, &result_column_id)); ColumnPtr filter_column = block->get_by_position(result_column_id).column; if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { ColumnPtr nested_column = nullable_column->get_nested_column_ptr(); @@ -512,7 +582,7 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags( bool ret = const_column->get_bool(0); if (!ret) { for (size_t i = 0; i < column_to_keep; ++i) { - std::move(*block->get_by_position(i).column).assume_mutable()->clear(); + block->get_by_position(i).column->assume_mutable()->clear(); } } else { if constexpr (SetBuildSideFlag) { @@ -569,6 +639,7 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags( } } #undef CLEAR_BLOCK + Block::erase_useless_column(block, column_to_keep); return Status::OK(); } @@ -576,6 +647,9 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VNestedLoopJoinNode::open") SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(VExpr::open(_filter_src_expr_ctxs, state)); + if (_vjoin_conjunct_ptr) { + RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->open(state)); + } RETURN_IF_ERROR(VJoinNodeBase::open(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); RETURN_IF_CANCELLED(state); @@ -602,6 +676,9 @@ void VNestedLoopJoinNode::_release_mem() { MutableColumns tmp_build_side_visited_flags; _build_side_visited_flags.swap(tmp_build_side_visited_flags); + + _tuple_is_null_left_flag_column = nullptr; + _tuple_is_null_right_flag_column = nullptr; } } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 7f47b7959e..45644f9e8e 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -83,6 +83,11 @@ private: Status get_left_side(RuntimeState* state, Block* block); + // add tuple is null flag column to Block for filter conjunct and output expr + void _update_tuple_is_null_column(Block* block); + + void _add_tuple_is_null_column(Block* block) override; + // List of build blocks, constructed in prepare() Blocks _build_blocks; // Visited flags for each row in build side. @@ -115,6 +120,7 @@ private: std::vector<TRuntimeFilterDesc> _runtime_filter_descs; std::vector<vectorized::VExprContext*> _filter_src_expr_ctxs; bool _is_output_left_side_only = false; + std::unique_ptr<VExprContext*> _vjoin_conjunct_ptr; friend struct RuntimeFilterBuild; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java index d7e0463016..1802f0917b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java @@ -90,7 +90,7 @@ public class BitmapFilterPredicate extends Predicate { @Override protected void toThrift(TExprNode msg) { - // Unreachable + Preconditions.checkArgument(false, "`toThrift` in BitmapFilterPredicate should not be reached!"); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index ea368bfedf..235f0265b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -896,8 +896,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla nestedLoopJoinNode.setChild(0, leftFragment.getPlanRoot()); connectChildFragment(nestedLoopJoinNode, 1, leftFragment, rightFragment, context); leftFragment.setPlanRoot(nestedLoopJoinNode); - nestedLoopJoin.getOtherJoinConjuncts().stream() - .map(e -> ExpressionTranslator.translate(e, context)).forEach(nestedLoopJoinNode::addConjunct); + List<Expr> joinConjuncts = nestedLoopJoin.getOtherJoinConjuncts().stream() + .map(e -> ExpressionTranslator.translate(e, context)).collect(Collectors.toList()); + nestedLoopJoinNode.setJoinConjuncts(joinConjuncts); if (nestedLoopJoin.isShouldTranslateOutput()) { // translate output expr on intermediate tuple diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index ac86bf7bd7..8eb8e85256 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -35,7 +35,6 @@ import org.apache.doris.catalog.ColumnStats; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.CheckedMath; -import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; @@ -266,38 +265,16 @@ public class HashJoinNode extends JoinNodeBase { } } - // output slots + predicate slots = input slots @Override - public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { - Set<SlotId> result = Sets.newHashSet(); - Preconditions.checkState(outputSlotIds != null); - // step1: change output slot id to src slot id - if (vSrcToOutputSMap != null) { - for (SlotId slotId : outputSlotIds) { - SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId)); - Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef); - if (srcExpr == null) { - result.add(slotId); - } else { - List<SlotRef> srcSlotRefList = Lists.newArrayList(); - srcExpr.collect(SlotRef.class, srcSlotRefList); - result.addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList())); - } - } - } + protected List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer analyzer) { // eq conjunct - List<SlotId> eqConjunctSlotIds = Lists.newArrayList(); - Expr.getIds(eqJoinConjuncts, null, eqConjunctSlotIds); - result.addAll(eqConjunctSlotIds); + List<SlotId> joinConjunctSlotIds = Lists.newArrayList(); + Expr.getIds(eqJoinConjuncts, null, joinConjunctSlotIds); // other conjunct List<SlotId> otherConjunctSlotIds = Lists.newArrayList(); Expr.getIds(otherJoinConjuncts, null, otherConjunctSlotIds); - result.addAll(otherConjunctSlotIds); - // conjunct - List<SlotId> conjunctSlotIds = Lists.newArrayList(); - Expr.getIds(conjuncts, null, conjunctSlotIds); - result.addAll(conjunctSlotIds); - return result; + joinConjunctSlotIds.addAll(otherConjunctSlotIds); + return joinConjunctSlotIds; } @Override @@ -308,7 +285,6 @@ public class HashJoinNode extends JoinNodeBase { List<Expr> newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false); eqJoinConjuncts = newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity).collect(Collectors.toList()); - assignedConjuncts = analyzer.getAssignedConjuncts(); otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false); // Only for Vec: create new tuple for join result diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java index b10a6a96e6..d1e94c0f9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java @@ -40,6 +40,7 @@ import org.apache.doris.thrift.TNullSide; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,6 +51,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public abstract class JoinNodeBase extends PlanNode { private static final Logger LOG = LogManager.getLogger(JoinNodeBase.class); @@ -334,7 +336,7 @@ public abstract class JoinNodeBase extends PlanNode { protected abstract Pair<Boolean, Boolean> needToCopyRightAndLeft(); - protected void computeOtherConjuncts(Analyzer analyzer, ExprSubstitutionMap originToIntermediateSmap) {} + protected abstract void computeOtherConjuncts(Analyzer analyzer, ExprSubstitutionMap originToIntermediateSmap); protected void computeIntermediateTuple(Analyzer analyzer) throws AnalysisException { // 1. create new tuple @@ -412,6 +414,34 @@ public abstract class JoinNodeBase extends PlanNode { TupleIsNullPredicate.substitueListForTupleIsNull(vSrcToOutputSMap.getLhs(), originTidsToIntermediateTidMap); } + protected abstract List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer analyzer); + + @Override + public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { + Set<SlotId> result = Sets.newHashSet(); + Preconditions.checkState(outputSlotIds != null); + // step1: change output slot id to src slot id + if (vSrcToOutputSMap != null) { + for (SlotId slotId : outputSlotIds) { + SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId)); + Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef); + if (srcExpr == null) { + result.add(slotId); + } else { + List<SlotRef> srcSlotRefList = Lists.newArrayList(); + srcExpr.collect(SlotRef.class, srcSlotRefList); + result.addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList())); + } + } + } + result.addAll(computeSlotIdsForJoinConjuncts(analyzer)); + // conjunct + List<SlotId> conjunctSlotIds = Lists.newArrayList(); + Expr.getIds(conjuncts, null, conjunctSlotIds); + result.addAll(conjunctSlotIds); + return result; + } + @Override public void finalize(Analyzer analyzer) throws UserException { super.finalize(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index acd84ac986..eb09d679e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -23,11 +23,9 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprSubstitutionMap; import org.apache.doris.analysis.JoinOperator; import org.apache.doris.analysis.SlotId; -import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; -import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; @@ -38,16 +36,12 @@ import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; /** * Nested loop join between left child and right child. @@ -58,6 +52,9 @@ public class NestedLoopJoinNode extends JoinNodeBase { private boolean isOutputLeftSideOnly = false; private List<Expr> runtimeFilterExpr = Lists.newArrayList(); + private List<Expr> joinConjuncts; + + private Expr vJoinConjunct; public NestedLoopJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef innerRef) { super(id, "NESTED LOOP JOIN", StatisticalType.NESTED_LOOP_JOIN_NODE, outer, inner, innerRef); @@ -71,29 +68,16 @@ public class NestedLoopJoinNode extends JoinNodeBase { || joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN; } + public void setJoinConjuncts(List<Expr> joinConjuncts) { + this.joinConjuncts = joinConjuncts; + } + @Override - public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { - Set<SlotId> result = Sets.newHashSet(); - Preconditions.checkState(outputSlotIds != null); - // step1: change output slot id to src slot id - if (vSrcToOutputSMap != null) { - for (SlotId slotId : outputSlotIds) { - SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId)); - Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef); - if (srcExpr == null) { - result.add(slotId); - } else { - List<SlotRef> srcSlotRefList = Lists.newArrayList(); - srcExpr.collect(SlotRef.class, srcSlotRefList); - result.addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList())); - } - } - } + protected List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer analyzer) { // conjunct List<SlotId> conjunctSlotIds = Lists.newArrayList(); - Expr.getIds(conjuncts, null, conjunctSlotIds); - result.addAll(conjunctSlotIds); - return result; + Expr.getIds(joinConjuncts, null, conjunctSlotIds); + return conjunctSlotIds; } @Override @@ -161,6 +145,25 @@ public class NestedLoopJoinNode extends JoinNodeBase { LOG.debug("stats NestedLoopJoin: cardinality={}", Long.toString(cardinality)); } + @Override + protected void computeOtherConjuncts(Analyzer analyzer, ExprSubstitutionMap originToIntermediateSmap) { + joinConjuncts = Expr.substituteList(joinConjuncts, originToIntermediateSmap, analyzer, false); + if (vJoinConjunct != null) { + vJoinConjunct = + Expr.substituteList(Collections.singletonList(vJoinConjunct), originToIntermediateSmap, analyzer, + false).get(0); + } + } + + @Override + public void convertToVectoriezd() { + if (!joinConjuncts.isEmpty()) { + vJoinConjunct = convertConjunctsToAndCompoundPredicate(joinConjuncts); + initCompoundPredicate(vJoinConjunct); + } + super.convertToVectoriezd(); + } + @Override protected String debugString() { return MoreObjects.toStringHelper(this).addValue(super.debugString()).toString(); @@ -170,6 +173,9 @@ public class NestedLoopJoinNode extends JoinNodeBase { protected void toThrift(TPlanNode msg) { msg.nested_loop_join_node = new TNestedLoopJoinNode(); msg.nested_loop_join_node.join_op = joinOp.toThrift(); + if (vJoinConjunct != null) { + msg.nested_loop_join_node.setVjoinConjunct(vJoinConjunct.treeToThrift()); + } if (vSrcToOutputSMap != null) { for (int i = 0; i < vSrcToOutputSMap.size(); i++) { // TODO: Enable it after we support new optimizers @@ -196,6 +202,8 @@ public class NestedLoopJoinNode extends JoinNodeBase { @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); + ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap(); + joinConjuncts = Expr.substituteList(joinConjuncts, combinedChildSmap, analyzer, false); computeCrossRuntimeFilterExpr(); // Only for Vec: create new tuple for join result @@ -226,6 +234,10 @@ public class NestedLoopJoinNode extends JoinNodeBase { return output.toString(); } + if (!joinConjuncts.isEmpty()) { + output.append(detailPrefix).append("join conjuncts: ").append(getExplainString(joinConjuncts)).append("\n"); + } + if (!conjuncts.isEmpty()) { output.append(detailPrefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 8bf52d3730..e9b50549c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -2065,14 +2065,9 @@ public class SingleNodePlanner { } analyzer.markConjunctsAssigned(ojConjuncts); if (eqJoinConjuncts.isEmpty()) { - // construct cross join node - // LOG.debug("Join between {} and {} requires at least one conjunctive" - // + " equality predicate between the two tables", - // outerRef.getAliasAsName(), innerRef.getAliasAsName()); - // TODO If there are eq join predicates then we should construct a hash join NestedLoopJoinNode result = new NestedLoopJoinNode(ctx.getNextNodeId(), outer, inner, innerRef); - result.addConjuncts(ojConjuncts); + result.setJoinConjuncts(ojConjuncts); result.init(analyzer); return result; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 2ba5c0c6b2..2725c67acc 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -615,6 +615,8 @@ struct TNestedLoopJoinNode { // for bitmap filer, don't need to join, but output left child tuple 5: optional bool is_output_left_side_only + + 6: optional Exprs.TExpr vjoin_conjunct } struct TMergeJoinNode { diff --git a/regression-test/data/query_p0/join/test_nestedloop_outer_join.out b/regression-test/data/query_p0/join/test_nestedloop_outer_join.out index 5cde383a94..dfe581bd65 100644 --- a/regression-test/data/query_p0/join/test_nestedloop_outer_join.out +++ b/regression-test/data/query_p0/join/test_nestedloop_outer_join.out @@ -75,3 +75,15 @@ 2 2 4 4 3 3 4 4 +-- !join -- +1 1 2 1 + +-- !join -- +1 1 2 1 + +-- !join -- +1 1 2 1 + +-- !join -- +1 1 2 1 + diff --git a/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy b/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy index 5ed53d0402..ad19e55469 100644 --- a/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy +++ b/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy @@ -103,7 +103,23 @@ suite("test_nestedloop_outer_join", "query_p0") { select * from ${tbl1} inner join ${tbl2} on ${tbl1}.user_id < ${tbl2}.user_id order by ${tbl1}.user_id, ${tbl2}.user_id; """ + sql """ INSERT INTO ${tbl2} VALUES (2, 1); """ + qt_join """ + select * from ${tbl1} full outer join ${tbl2} on ${tbl1}.user_id < ${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by ${tbl1}.user_id, ${tbl2}.user_id; + """ + + qt_join """ + select * from ${tbl1} right outer join ${tbl2} on ${tbl1}.user_id < ${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by ${tbl1}.user_id, ${tbl2}.user_id; + """ + + qt_join """ + select * from ${tbl1} left outer join ${tbl2} on ${tbl1}.user_id < ${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by ${tbl1}.user_id, ${tbl2}.user_id; + """ + + qt_join """ + select * from ${tbl1} inner join ${tbl2} on ${tbl1}.user_id < ${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by ${tbl1}.user_id, ${tbl2}.user_id; + """ sql "DROP TABLE IF EXISTS ${tbl1}" sql "DROP TABLE IF EXISTS ${tbl2}" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org