This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7513c82431 [NLJoin](conjuncts) separate join conjuncts and general 
conjuncts (#14608)
7513c82431 is described below

commit 7513c82431b4aa2c6d8ab45ddfb6874e3c4fa7fd
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Nov 29 08:55:54 2022 +0800

    [NLJoin](conjuncts) separate join conjuncts and general conjuncts (#14608)
---
 be/src/util/runtime_profile.h                      |  2 +-
 be/src/vec/exec/join/vhash_join_node.cpp           | 65 +++++++---------
 be/src/vec/exec/join/vhash_join_node.h             | 15 +---
 be/src/vec/exec/join/vjoin_node_base.cpp           | 12 +++
 be/src/vec/exec/join/vjoin_node_base.h             |  9 +++
 be/src/vec/exec/join/vnested_loop_join_node.cpp    | 87 ++++++++++++++++++++--
 be/src/vec/exec/join/vnested_loop_join_node.h      |  6 ++
 .../doris/analysis/BitmapFilterPredicate.java      |  2 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  5 +-
 .../org/apache/doris/planner/HashJoinNode.java     | 34 ++-------
 .../org/apache/doris/planner/JoinNodeBase.java     | 32 +++++++-
 .../apache/doris/planner/NestedLoopJoinNode.java   | 64 +++++++++-------
 .../apache/doris/planner/SingleNodePlanner.java    |  7 +-
 gensrc/thrift/PlanNodes.thrift                     |  2 +
 .../query_p0/join/test_nestedloop_outer_join.out   | 12 +++
 .../join/test_nestedloop_outer_join.groovy         | 16 ++++
 16 files changed, 249 insertions(+), 121 deletions(-)

diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 55b4b2c3d2..62f0365afc 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -539,7 +539,7 @@ public:
         if (counter == nullptr) {
             return;
         }
-        DCHECK(counter->type() == TUnit::TIME_NS);
+        DCHECK_EQ(counter->type(), TUnit::TIME_NS);
         _sw.start();
     }
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 2cce0fdff5..35b83e2a8d 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -343,11 +343,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     init_output_slots_flags(child(0)->row_desc().tuple_descriptors(), 
_left_output_slot_flags);
     init_output_slots_flags(child(1)->row_desc().tuple_descriptors(), 
_right_output_slot_flags);
 
-    // only use in outer join as the bool column to mark for function of 
`tuple_is_null`
-    if (_is_outer_join) {
-        _tuple_is_null_left_flag_column = ColumnUInt8::create();
-        _tuple_is_null_right_flag_column = ColumnUInt8::create();
-    }
     return Status::OK();
 }
 
@@ -565,7 +560,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* 
output_block, bool* eo
         return Status::OK();
     }
 
-    _add_tuple_is_null_column(&temp_block);
+    if (_is_outer_join) {
+        _add_tuple_is_null_column(&temp_block);
+    }
     {
         SCOPED_TIMER(_join_filter_timer);
         RETURN_IF_ERROR(
@@ -578,6 +575,30 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* 
output_block, bool* eo
     return st;
 }
 
+void HashJoinNode::_add_tuple_is_null_column(Block* block) {
+    DCHECK(_is_outer_join);
+    auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
+    auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
+    auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0);
+    auto& right_null_map = reinterpret_cast<ColumnUInt8&>(*p1);
+    auto left_size = left_null_map.size();
+    auto right_size = right_null_map.size();
+
+    if (left_size == 0) {
+        DCHECK_EQ(right_size, block->rows());
+        left_null_map.get_data().resize_fill(right_size, 0);
+    }
+    if (right_size == 0) {
+        DCHECK_EQ(left_size, block->rows());
+        right_null_map.get_data().resize_fill(left_size, 0);
+    }
+
+    block->insert(
+            {std::move(p0), std::make_shared<vectorized::DataTypeUInt8>(), 
"left_tuples_is_null"});
+    block->insert(
+            {std::move(p1), std::make_shared<vectorized::DataTypeUInt8>(), 
"right_tuples_is_null"});
+}
+
 void HashJoinNode::_prepare_probe_block() {
     // clear_column_data of _probe_block
     if (!_probe_column_disguise_null.empty()) {
@@ -1051,38 +1072,6 @@ std::vector<uint16_t> 
HashJoinNode::_convert_block_to_null(Block& block) {
     return results;
 }
 
-void HashJoinNode::_add_tuple_is_null_column(Block* block) {
-    if (_is_outer_join) {
-        auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
-        auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
-        auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0);
-        auto& right_null_map = reinterpret_cast<ColumnUInt8&>(*p1);
-        auto left_size = left_null_map.size();
-        auto right_size = right_null_map.size();
-
-        if (left_size == 0) {
-            DCHECK_EQ(right_size, block->rows());
-            left_null_map.get_data().resize_fill(right_size, 0);
-        }
-        if (right_size == 0) {
-            DCHECK_EQ(left_size, block->rows());
-            right_null_map.get_data().resize_fill(left_size, 0);
-        }
-
-        block->insert({std::move(p0), 
std::make_shared<vectorized::DataTypeUInt8>(),
-                       "left_tuples_is_null"});
-        block->insert({std::move(p1), 
std::make_shared<vectorized::DataTypeUInt8>(),
-                       "right_tuples_is_null"});
-    }
-}
-
-void HashJoinNode::_reset_tuple_is_null_column() {
-    if (_is_outer_join) {
-        
reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_left_flag_column).clear();
-        
reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_right_flag_column).clear();
-    }
-}
-
 HashJoinNode::~HashJoinNode() {
     if (_shared_hashtable_controller && _should_build_hash_table) {
         _shared_hashtable_controller->signal(id());
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 1a35bb2652..a41d3c2f7b 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -251,8 +251,6 @@ private:
     RuntimeProfile::Counter* _build_side_compute_hash_timer;
     RuntimeProfile::Counter* _build_side_merge_block_timer;
 
-    RuntimeProfile::Counter* _join_filter_timer;
-
     RuntimeProfile* _build_phase_profile;
 
     int64_t _mem_used;
@@ -289,12 +287,8 @@ private:
     std::vector<bool> _left_output_slot_flags;
     std::vector<bool> _right_output_slot_flags;
 
-    MutableColumnPtr _tuple_is_null_left_flag_column;
-    MutableColumnPtr _tuple_is_null_right_flag_column;
-
     SharedHashTableContextPtr _shared_hash_table_context = nullptr;
 
-private:
     Status _materialize_build_side(RuntimeState* state) override;
 
     Status _process_build_block(RuntimeState* state, Block& block, uint8_t 
offset);
@@ -317,16 +311,13 @@ private:
 
     void _prepare_probe_block();
 
-    // add tuple is null flag column to Block for filter conjunct and output 
expr
-    void _add_tuple_is_null_column(Block* block);
-
-    // reset the tuple is null flag column for the next call
-    void _reset_tuple_is_null_column();
-
     static std::vector<uint16_t> _convert_block_to_null(Block& block);
 
     void _release_mem();
 
+    // add tuple is null flag column to Block for filter conjunct and output 
expr
+    void _add_tuple_is_null_column(Block* block) override;
+
     template <class HashTableContext>
     friend struct ProcessHashTableBuild;
 
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp 
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 731b383340..d5d5280f57 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -144,6 +144,11 @@ Status VJoinNodeBase::init(const TPlanNode& tnode, 
RuntimeState* state) {
             _output_expr_ctxs.push_back(ctx);
         }
     }
+    // only use in outer join as the bool column to mark for function of 
`tuple_is_null`
+    if (_is_outer_join) {
+        _tuple_is_null_left_flag_column = ColumnUInt8::create();
+        _tuple_is_null_right_flag_column = ColumnUInt8::create();
+    }
     return ExecNode::init(tnode, state);
 }
 
@@ -173,6 +178,13 @@ Status VJoinNodeBase::open(RuntimeState* state) {
     return status;
 }
 
+void VJoinNodeBase::_reset_tuple_is_null_column() {
+    if (_is_outer_join) {
+        
reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_left_flag_column).clear();
+        
reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_right_flag_column).clear();
+    }
+}
+
 void VJoinNodeBase::_probe_side_open_thread(RuntimeState* state, 
std::promise<Status>* status) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VJoinNodeBase::_hash_table_build_thread");
     SCOPED_ATTACH_TASK(state);
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index 81869c373d..f2bdc6eced 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -68,6 +68,11 @@ protected:
     // Initialize the join operation.
     void _init_join_op();
 
+    virtual void _add_tuple_is_null_column(Block* block) = 0;
+
+    // reset the tuple is null flag column for the next call
+    void _reset_tuple_is_null_column();
+
     // Materialize build relation. For HashJoin, it will build a hash table 
while a list of build blocks for NLJoin.
     virtual Status _materialize_build_side(RuntimeState* state) = 0;
 
@@ -97,12 +102,16 @@ protected:
 
     Block _join_block;
 
+    MutableColumnPtr _tuple_is_null_left_flag_column;
+    MutableColumnPtr _tuple_is_null_right_flag_column;
+
     RuntimeProfile::Counter* _build_timer;
     RuntimeProfile::Counter* _probe_timer;
     RuntimeProfile::Counter* _build_rows_counter;
     RuntimeProfile::Counter* _probe_rows_counter;
     RuntimeProfile::Counter* _push_down_timer;
     RuntimeProfile::Counter* _push_compute_timer;
+    RuntimeProfile::Counter* _join_filter_timer;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 356a9a7a5a..246122d5df 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -83,6 +83,12 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
         _is_output_left_side_only = 
tnode.nested_loop_join_node.is_output_left_side_only;
     }
 
+    if (tnode.nested_loop_join_node.__isset.vjoin_conjunct) {
+        _vjoin_conjunct_ptr.reset(new VExprContext*);
+        RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, 
tnode.nested_loop_join_node.vjoin_conjunct,
+                                                _vjoin_conjunct_ptr.get()));
+    }
+
     std::vector<TExpr> filter_src_exprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
         filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
@@ -106,6 +112,7 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
     _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
     _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
     _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
+    _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer");
 
     // pre-compute the tuple index of build tuples in the output row
     int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
@@ -116,6 +123,9 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
         RETURN_IF_INVALID_TUPLE_IDX(build_tuple_desc->id(), tuple_idx);
     }
 
+    if (_vjoin_conjunct_ptr) {
+        RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->prepare(state, 
*_intermediate_row_desc));
+    }
     _num_probe_side_columns = child(0)->row_desc().num_materialized_slots();
     _num_build_side_columns = child(1)->row_desc().num_materialized_slots();
     RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, 
*_intermediate_row_desc));
@@ -132,6 +142,7 @@ Status VNestedLoopJoinNode::close(RuntimeState* state) {
     }
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VNestedLoopJoinNode::close");
     VExpr::close(_filter_src_expr_ctxs, state);
+    if (_vjoin_conjunct_ptr) (*_vjoin_conjunct_ptr)->close(state);
     _release_mem();
 
     return VJoinNodeBase::close(state);
@@ -191,9 +202,9 @@ Status VNestedLoopJoinNode::get_left_side(RuntimeState* 
state, Block* block) {
 Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* 
eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
                                  "VNestedLoopJoinNode::get_next");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_TIMER(_probe_timer);
     RETURN_IF_CANCELLED(state);
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
     if (_is_output_left_side_only) {
@@ -249,6 +260,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
                         Status status = 
_do_filtering_and_update_visited_flags<set_build_side_flag,
                                                                                
set_probe_side_flag>(
                                 &tmp_block, offset_stack, !_is_left_semi_anti);
+                        _update_tuple_is_null_column(&tmp_block);
                         if (!status.OK()) {
                             return status;
                         }
@@ -277,6 +289,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
                     Status status = 
_do_filtering_and_update_visited_flags<set_build_side_flag,
                                                                            
set_probe_side_flag>(
                             &tmp_block, offset_stack, !_is_right_semi_anti);
+                    _update_tuple_is_null_column(&tmp_block);
                     mutable_join_block = MutableBlock(std::move(tmp_block));
                     if (!status.OK()) {
                         return status;
@@ -300,7 +313,16 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
                    : _matched_rows_done;
 
     Block tmp_block = mutable_join_block.to_block(0);
+    if (_is_outer_join) {
+        _add_tuple_is_null_column(&tmp_block);
+    }
+    {
+        SCOPED_TIMER(_join_filter_timer);
+        RETURN_IF_ERROR(
+                VExprContext::filter_block(_vconjunct_ctx_ptr, &tmp_block, 
tmp_block.columns()));
+    }
     RETURN_IF_ERROR(_build_output_block(&tmp_block, block));
+    _reset_tuple_is_null_column();
     reached_limit(block, eos);
     return Status::OK();
 }
@@ -344,8 +366,38 @@ void 
VNestedLoopJoinNode::_process_left_child_block(MutableColumns& dst_columns,
     }
 }
 
+void VNestedLoopJoinNode::_update_tuple_is_null_column(Block* block) {
+    if (_is_outer_join) {
+        auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
+        auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
+        auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0);
+        auto& right_null_map = reinterpret_cast<ColumnUInt8&>(*p1);
+        auto left_size = left_null_map.size();
+        auto right_size = right_null_map.size();
+
+        if (left_size < block->rows()) {
+            left_null_map.get_data().resize_fill(block->rows(), 0);
+        }
+        if (right_size < block->rows()) {
+            right_null_map.get_data().resize_fill(block->rows(), 0);
+        }
+    }
+}
+
+void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) {
+    DCHECK(_is_outer_join);
+    auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
+    auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
+    block->insert(
+            {std::move(p0), std::make_shared<vectorized::DataTypeUInt8>(), 
"left_tuples_is_null"});
+    block->insert(
+            {std::move(p1), std::make_shared<vectorized::DataTypeUInt8>(), 
"right_tuples_is_null"});
+}
+
 template <bool BuildSide, bool IsSemi>
 void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, 
size_t batch_size) {
+    DCHECK_GT(dst_columns.size(), 0);
+    auto pre_size = dst_columns[0]->size();
     if constexpr (BuildSide) {
         auto build_block_sz = _build_blocks.size();
         size_t i = _output_null_idx_build_side;
@@ -378,6 +430,15 @@ void 
VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s
                 dst_columns[j]->insert_many_defaults(selector_idx);
             }
 
+            if (_is_outer_join) {
+                
reinterpret_cast<ColumnUInt8*>(_tuple_is_null_left_flag_column.get())
+                        ->get_data()
+                        .resize_fill(pre_size + selector_idx, 1);
+                
reinterpret_cast<ColumnUInt8*>(_tuple_is_null_right_flag_column.get())
+                        ->get_data()
+                        .resize_fill(pre_size + selector_idx, 0);
+            }
+
             for (size_t j = 0; j < _num_build_side_columns; ++j) {
                 auto src_column = cur_block.get_by_position(j);
                 if (!src_column.column->is_nullable() &&
@@ -403,6 +464,7 @@ void 
VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s
                 i++;
                 break;
             }
+            pre_size = dst_columns[0]->size();
         }
         _output_null_idx_build_side = i;
     } else {
@@ -437,6 +499,14 @@ void 
VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s
         for (size_t i = 0; i < _num_build_side_columns; ++i) {
             dst_columns[_num_probe_side_columns + i]->insert_default();
         }
+        if (_is_outer_join) {
+            
reinterpret_cast<ColumnUInt8*>(_tuple_is_null_left_flag_column.get())
+                    ->get_data()
+                    .resize_fill(pre_size + 1, 0);
+            
reinterpret_cast<ColumnUInt8*>(_tuple_is_null_right_flag_column.get())
+                    ->get_data()
+                    .resize_fill(pre_size + 1, 1);
+        }
     }
 }
 
@@ -457,10 +527,10 @@ Status 
VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(
     size_t build_block_idx =
             _current_build_pos == 0 ? _build_blocks.size() - 1 : 
_current_build_pos - 1;
     size_t processed_blocks_num = offset_stack.size();
-    if (LIKELY(_vconjunct_ctx_ptr != nullptr && block->rows() > 0)) {
-        DCHECK((*_vconjunct_ctx_ptr) != nullptr);
+    if (LIKELY(_vjoin_conjunct_ptr != nullptr && block->rows() > 0)) {
+        DCHECK((*_vjoin_conjunct_ptr) != nullptr);
         int result_column_id = -1;
-        RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->execute(block, 
&result_column_id));
+        RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->execute(block, 
&result_column_id));
         ColumnPtr filter_column = 
block->get_by_position(result_column_id).column;
         if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
             ColumnPtr nested_column = nullable_column->get_nested_column_ptr();
@@ -512,7 +582,7 @@ Status 
VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(
             bool ret = const_column->get_bool(0);
             if (!ret) {
                 for (size_t i = 0; i < column_to_keep; ++i) {
-                    
std::move(*block->get_by_position(i).column).assume_mutable()->clear();
+                    
block->get_by_position(i).column->assume_mutable()->clear();
                 }
             } else {
                 if constexpr (SetBuildSideFlag) {
@@ -569,6 +639,7 @@ Status 
VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(
         }
     }
 #undef CLEAR_BLOCK
+    Block::erase_useless_column(block, column_to_keep);
     return Status::OK();
 }
 
@@ -576,6 +647,9 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VNestedLoopJoinNode::open")
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(VExpr::open(_filter_src_expr_ctxs, state));
+    if (_vjoin_conjunct_ptr) {
+        RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->open(state));
+    }
     RETURN_IF_ERROR(VJoinNodeBase::open(state));
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
@@ -602,6 +676,9 @@ void VNestedLoopJoinNode::_release_mem() {
 
     MutableColumns tmp_build_side_visited_flags;
     _build_side_visited_flags.swap(tmp_build_side_visited_flags);
+
+    _tuple_is_null_left_flag_column = nullptr;
+    _tuple_is_null_right_flag_column = nullptr;
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h 
b/be/src/vec/exec/join/vnested_loop_join_node.h
index 7f47b7959e..45644f9e8e 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -83,6 +83,11 @@ private:
 
     Status get_left_side(RuntimeState* state, Block* block);
 
+    // add tuple is null flag column to Block for filter conjunct and output 
expr
+    void _update_tuple_is_null_column(Block* block);
+
+    void _add_tuple_is_null_column(Block* block) override;
+
     // List of build blocks, constructed in prepare()
     Blocks _build_blocks;
     // Visited flags for each row in build side.
@@ -115,6 +120,7 @@ private:
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     std::vector<vectorized::VExprContext*> _filter_src_expr_ctxs;
     bool _is_output_left_side_only = false;
+    std::unique_ptr<VExprContext*> _vjoin_conjunct_ptr;
 
     friend struct RuntimeFilterBuild;
 };
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java
index d7e0463016..1802f0917b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java
@@ -90,7 +90,7 @@ public class BitmapFilterPredicate extends Predicate {
 
     @Override
     protected void toThrift(TExprNode msg) {
-        // Unreachable
+        Preconditions.checkArgument(false, "`toThrift` in 
BitmapFilterPredicate should not be reached!");
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index ea368bfedf..235f0265b6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -896,8 +896,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             nestedLoopJoinNode.setChild(0, leftFragment.getPlanRoot());
             connectChildFragment(nestedLoopJoinNode, 1, leftFragment, 
rightFragment, context);
             leftFragment.setPlanRoot(nestedLoopJoinNode);
-            nestedLoopJoin.getOtherJoinConjuncts().stream()
-                    .map(e -> ExpressionTranslator.translate(e, 
context)).forEach(nestedLoopJoinNode::addConjunct);
+            List<Expr> joinConjuncts = 
nestedLoopJoin.getOtherJoinConjuncts().stream()
+                    .map(e -> ExpressionTranslator.translate(e, 
context)).collect(Collectors.toList());
+            nestedLoopJoinNode.setJoinConjuncts(joinConjuncts);
 
             if (nestedLoopJoin.isShouldTranslateOutput()) {
                 // translate output expr on intermediate tuple
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index ac86bf7bd7..8eb8e85256 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -35,7 +35,6 @@ import org.apache.doris.catalog.ColumnStats;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.CheckedMath;
-import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.VectorizedUtil;
@@ -266,38 +265,16 @@ public class HashJoinNode extends JoinNodeBase {
         }
     }
 
-    // output slots + predicate slots = input slots
     @Override
-    public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws 
NotImplementedException {
-        Set<SlotId> result = Sets.newHashSet();
-        Preconditions.checkState(outputSlotIds != null);
-        // step1: change output slot id to src slot id
-        if (vSrcToOutputSMap != null) {
-            for (SlotId slotId : outputSlotIds) {
-                SlotRef slotRef = new 
SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
-                Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
-                if (srcExpr == null) {
-                    result.add(slotId);
-                } else {
-                    List<SlotRef> srcSlotRefList = Lists.newArrayList();
-                    srcExpr.collect(SlotRef.class, srcSlotRefList);
-                    result.addAll(srcSlotRefList.stream().map(e -> 
e.getSlotId()).collect(Collectors.toList()));
-                }
-            }
-        }
+    protected List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer analyzer) {
         // eq conjunct
-        List<SlotId> eqConjunctSlotIds = Lists.newArrayList();
-        Expr.getIds(eqJoinConjuncts, null, eqConjunctSlotIds);
-        result.addAll(eqConjunctSlotIds);
+        List<SlotId> joinConjunctSlotIds = Lists.newArrayList();
+        Expr.getIds(eqJoinConjuncts, null, joinConjunctSlotIds);
         // other conjunct
         List<SlotId> otherConjunctSlotIds = Lists.newArrayList();
         Expr.getIds(otherJoinConjuncts, null, otherConjunctSlotIds);
-        result.addAll(otherConjunctSlotIds);
-        // conjunct
-        List<SlotId> conjunctSlotIds = Lists.newArrayList();
-        Expr.getIds(conjuncts, null, conjunctSlotIds);
-        result.addAll(conjunctSlotIds);
-        return result;
+        joinConjunctSlotIds.addAll(otherConjunctSlotIds);
+        return joinConjunctSlotIds;
     }
 
     @Override
@@ -308,7 +285,6 @@ public class HashJoinNode extends JoinNodeBase {
         List<Expr> newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, 
combinedChildSmap, analyzer, false);
         eqJoinConjuncts =
                 newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) 
entity).collect(Collectors.toList());
-        assignedConjuncts = analyzer.getAssignedConjuncts();
         otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, 
combinedChildSmap, analyzer, false);
 
         // Only for Vec: create new tuple for join result
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
index b10a6a96e6..d1e94c0f9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
@@ -40,6 +40,7 @@ import org.apache.doris.thrift.TNullSide;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -50,6 +51,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public abstract class JoinNodeBase extends PlanNode {
     private static final Logger LOG = LogManager.getLogger(JoinNodeBase.class);
@@ -334,7 +336,7 @@ public abstract class JoinNodeBase extends PlanNode {
 
     protected abstract Pair<Boolean, Boolean> needToCopyRightAndLeft();
 
-    protected void computeOtherConjuncts(Analyzer analyzer, 
ExprSubstitutionMap originToIntermediateSmap) {}
+    protected abstract void computeOtherConjuncts(Analyzer analyzer, 
ExprSubstitutionMap originToIntermediateSmap);
 
     protected void computeIntermediateTuple(Analyzer analyzer) throws 
AnalysisException {
         // 1. create new tuple
@@ -412,6 +414,34 @@ public abstract class JoinNodeBase extends PlanNode {
         
TupleIsNullPredicate.substitueListForTupleIsNull(vSrcToOutputSMap.getLhs(), 
originTidsToIntermediateTidMap);
     }
 
+    protected abstract List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer 
analyzer);
+
+    @Override
+    public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws 
NotImplementedException {
+        Set<SlotId> result = Sets.newHashSet();
+        Preconditions.checkState(outputSlotIds != null);
+        // step1: change output slot id to src slot id
+        if (vSrcToOutputSMap != null) {
+            for (SlotId slotId : outputSlotIds) {
+                SlotRef slotRef = new 
SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
+                Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
+                if (srcExpr == null) {
+                    result.add(slotId);
+                } else {
+                    List<SlotRef> srcSlotRefList = Lists.newArrayList();
+                    srcExpr.collect(SlotRef.class, srcSlotRefList);
+                    result.addAll(srcSlotRefList.stream().map(e -> 
e.getSlotId()).collect(Collectors.toList()));
+                }
+            }
+        }
+        result.addAll(computeSlotIdsForJoinConjuncts(analyzer));
+        // conjunct
+        List<SlotId> conjunctSlotIds = Lists.newArrayList();
+        Expr.getIds(conjuncts, null, conjunctSlotIds);
+        result.addAll(conjunctSlotIds);
+        return result;
+    }
+
     @Override
     public void finalize(Analyzer analyzer) throws UserException {
         super.finalize(analyzer);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index acd84ac986..eb09d679e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -23,11 +23,9 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.JoinOperator;
 import org.apache.doris.analysis.SlotId;
-import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
-import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.VectorizedUtil;
@@ -38,16 +36,12 @@ import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 
 import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * Nested loop join between left child and right child.
@@ -58,6 +52,9 @@ public class NestedLoopJoinNode extends JoinNodeBase {
     private boolean isOutputLeftSideOnly = false;
 
     private List<Expr> runtimeFilterExpr = Lists.newArrayList();
+    private List<Expr> joinConjuncts;
+
+    private Expr vJoinConjunct;
 
     public NestedLoopJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, 
TableRef innerRef) {
         super(id, "NESTED LOOP JOIN", StatisticalType.NESTED_LOOP_JOIN_NODE, 
outer, inner, innerRef);
@@ -71,29 +68,16 @@ public class NestedLoopJoinNode extends JoinNodeBase {
                 || joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
     }
 
+    public void setJoinConjuncts(List<Expr> joinConjuncts) {
+        this.joinConjuncts = joinConjuncts;
+    }
+
     @Override
-    public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws 
NotImplementedException {
-        Set<SlotId> result = Sets.newHashSet();
-        Preconditions.checkState(outputSlotIds != null);
-        // step1: change output slot id to src slot id
-        if (vSrcToOutputSMap != null) {
-            for (SlotId slotId : outputSlotIds) {
-                SlotRef slotRef = new 
SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
-                Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
-                if (srcExpr == null) {
-                    result.add(slotId);
-                } else {
-                    List<SlotRef> srcSlotRefList = Lists.newArrayList();
-                    srcExpr.collect(SlotRef.class, srcSlotRefList);
-                    result.addAll(srcSlotRefList.stream().map(e -> 
e.getSlotId()).collect(Collectors.toList()));
-                }
-            }
-        }
+    protected List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer analyzer) {
         // conjunct
         List<SlotId> conjunctSlotIds = Lists.newArrayList();
-        Expr.getIds(conjuncts, null, conjunctSlotIds);
-        result.addAll(conjunctSlotIds);
-        return result;
+        Expr.getIds(joinConjuncts, null, conjunctSlotIds);
+        return conjunctSlotIds;
     }
 
     @Override
@@ -161,6 +145,25 @@ public class NestedLoopJoinNode extends JoinNodeBase {
         LOG.debug("stats NestedLoopJoin: cardinality={}", 
Long.toString(cardinality));
     }
 
+    @Override
+    protected void computeOtherConjuncts(Analyzer analyzer, 
ExprSubstitutionMap originToIntermediateSmap) {
+        joinConjuncts = Expr.substituteList(joinConjuncts, 
originToIntermediateSmap, analyzer, false);
+        if (vJoinConjunct != null) {
+            vJoinConjunct =
+                    
Expr.substituteList(Collections.singletonList(vJoinConjunct), 
originToIntermediateSmap, analyzer,
+                                    false).get(0);
+        }
+    }
+
+    @Override
+    public void convertToVectoriezd() {
+        if (!joinConjuncts.isEmpty()) {
+            vJoinConjunct = 
convertConjunctsToAndCompoundPredicate(joinConjuncts);
+            initCompoundPredicate(vJoinConjunct);
+        }
+        super.convertToVectoriezd();
+    }
+
     @Override
     protected String debugString() {
         return 
MoreObjects.toStringHelper(this).addValue(super.debugString()).toString();
@@ -170,6 +173,9 @@ public class NestedLoopJoinNode extends JoinNodeBase {
     protected void toThrift(TPlanNode msg) {
         msg.nested_loop_join_node = new TNestedLoopJoinNode();
         msg.nested_loop_join_node.join_op = joinOp.toThrift();
+        if (vJoinConjunct != null) {
+            
msg.nested_loop_join_node.setVjoinConjunct(vJoinConjunct.treeToThrift());
+        }
         if (vSrcToOutputSMap != null) {
             for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
                 // TODO: Enable it after we support new optimizers
@@ -196,6 +202,8 @@ public class NestedLoopJoinNode extends JoinNodeBase {
     @Override
     public void init(Analyzer analyzer) throws UserException {
         super.init(analyzer);
+        ExprSubstitutionMap combinedChildSmap = 
getCombinedChildWithoutTupleIsNullSmap();
+        joinConjuncts = Expr.substituteList(joinConjuncts, combinedChildSmap, 
analyzer, false);
         computeCrossRuntimeFilterExpr();
 
         // Only for Vec: create new tuple for join result
@@ -226,6 +234,10 @@ public class NestedLoopJoinNode extends JoinNodeBase {
             return output.toString();
         }
 
+        if (!joinConjuncts.isEmpty()) {
+            output.append(detailPrefix).append("join conjuncts: 
").append(getExplainString(joinConjuncts)).append("\n");
+        }
+
         if (!conjuncts.isEmpty()) {
             output.append(detailPrefix).append("predicates: 
").append(getExplainString(conjuncts)).append("\n");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 8bf52d3730..e9b50549c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -2065,14 +2065,9 @@ public class SingleNodePlanner {
         }
         analyzer.markConjunctsAssigned(ojConjuncts);
         if (eqJoinConjuncts.isEmpty()) {
-            // construct cross join node
-            // LOG.debug("Join between {} and {} requires at least one 
conjunctive"
-            //        + " equality predicate between the two tables",
-            //        outerRef.getAliasAsName(), innerRef.getAliasAsName());
-            // TODO If there are eq join predicates then we should construct a 
hash join
             NestedLoopJoinNode result =
                     new NestedLoopJoinNode(ctx.getNextNodeId(), outer, inner, 
innerRef);
-            result.addConjuncts(ojConjuncts);
+            result.setJoinConjuncts(ojConjuncts);
             result.init(analyzer);
             return result;
         }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2ba5c0c6b2..2725c67acc 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -615,6 +615,8 @@ struct TNestedLoopJoinNode {
 
   // for bitmap filer, don't need to join, but output left child tuple
   5: optional bool is_output_left_side_only
+
+  6: optional Exprs.TExpr vjoin_conjunct
 }
 
 struct TMergeJoinNode {
diff --git a/regression-test/data/query_p0/join/test_nestedloop_outer_join.out 
b/regression-test/data/query_p0/join/test_nestedloop_outer_join.out
index 5cde383a94..dfe581bd65 100644
--- a/regression-test/data/query_p0/join/test_nestedloop_outer_join.out
+++ b/regression-test/data/query_p0/join/test_nestedloop_outer_join.out
@@ -75,3 +75,15 @@
 2      2       4       4
 3      3       4       4
 
+-- !join --
+1      1       2       1
+
+-- !join --
+1      1       2       1
+
+-- !join --
+1      1       2       1
+
+-- !join --
+1      1       2       1
+
diff --git 
a/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy 
b/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy
index 5ed53d0402..ad19e55469 100644
--- a/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy
+++ b/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy
@@ -103,7 +103,23 @@ suite("test_nestedloop_outer_join", "query_p0") {
         select * from ${tbl1} inner join ${tbl2} on ${tbl1}.user_id < 
${tbl2}.user_id order by ${tbl1}.user_id, ${tbl2}.user_id;
     """
 
+    sql """ INSERT INTO ${tbl2} VALUES (2, 1); """
 
+    qt_join """
+        select * from ${tbl1} full outer join ${tbl2} on ${tbl1}.user_id < 
${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by 
${tbl1}.user_id, ${tbl2}.user_id;
+    """
+
+    qt_join """
+        select * from ${tbl1} right outer join ${tbl2} on ${tbl1}.user_id < 
${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by 
${tbl1}.user_id, ${tbl2}.user_id;
+    """
+
+    qt_join """
+        select * from ${tbl1} left outer join ${tbl2} on ${tbl1}.user_id < 
${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by 
${tbl1}.user_id, ${tbl2}.user_id;
+    """
+
+    qt_join """
+        select * from ${tbl1} inner join ${tbl2} on ${tbl1}.user_id < 
${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by 
${tbl1}.user_id, ${tbl2}.user_id;
+    """
 
     sql "DROP TABLE IF EXISTS ${tbl1}"
     sql "DROP TABLE IF EXISTS ${tbl2}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to