This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bfba058ecf [Feature](join) Support null aware left anti join (#13871)
bfba058ecf is described below

commit bfba058ecf4b3c41ee4d7f5eced22bf9ab8f74a5
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Nov 3 12:11:25 2022 +0800

    [Feature](join) Support null aware left anti join (#13871)
---
 be/src/vec/exec/join/vhash_join_node.cpp           | 98 +++++++++++++++-------
 be/src/vec/exec/join/vhash_join_node.h             | 18 ++--
 .../org/apache/doris/analysis/JoinOperator.java    |  3 +-
 .../org/apache/doris/analysis/StmtRewriter.java    |  8 +-
 .../java/org/apache/doris/analysis/TableRef.java   |  2 +
 .../apache/doris/nereids/trees/plans/JoinType.java | 13 ++-
 .../doris/planner/DistributedPlanColocateRule.java |  2 +
 .../apache/doris/planner/DistributedPlanner.java   | 30 +++++--
 .../doris/planner/RuntimeFilterGenerator.java      |  3 +-
 .../org/apache/doris/rewrite/ExprRewriter.java     |  4 +-
 .../org/apache/doris/rewrite/InferFiltersRule.java |  6 +-
 .../test_null_aware_left_anti_join.out             | 10 +++
 .../test_null_aware_left_anti_join.groovy          | 66 +++++++++++++++
 .../tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy   |  4 +-
 14 files changed, 207 insertions(+), 60 deletions(-)

diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index eb419eb52f..a478df8744 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -59,8 +59,8 @@ struct ProcessHashTableBuild {
               
_build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {}
 
     template <bool need_null_map_for_build, bool ignore_null, bool 
build_unique,
-              bool has_runtime_filter>
-    void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map) {
+              bool has_runtime_filter, bool short_circuit_for_null>
+    void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* 
has_null_key) {
         using KeyGetter = typename HashTableContext::State;
         using Mapped = typename HashTableContext::Mapped;
         int64_t old_bucket_bytes = 
hash_table_ctx.hash_table.get_buffer_size_in_bytes();
@@ -97,6 +97,15 @@ struct ProcessHashTableBuild {
                         continue;
                     }
                 }
+                // If apply short circuit strategy for null value (e.g. join 
operator is
+                // NULL_AWARE_LEFT_ANTI_JOIN), we build hash table until we 
meet a null value.
+                if constexpr (short_circuit_for_null && 
need_null_map_for_build) {
+                    if ((*null_map)[k]) {
+                        DCHECK(has_null_key);
+                        *has_null_key = true;
+                        return;
+                    }
+                }
                 if constexpr 
(IsSerializedHashTableContextTraits<KeyGetter>::value) {
                     _build_side_hash_values[k] =
                             
hash_table_ctx.hash_table.hash(key_getter.get_key_holder(k, arena).key);
@@ -218,6 +227,7 @@ void ProcessHashTableProbe<JoinOpType, 
ignore_null>::build_side_output_column(
     constexpr auto is_semi_anti_join = JoinOpType::value == 
TJoinOp::RIGHT_ANTI_JOIN ||
                                        JoinOpType::value == 
TJoinOp::RIGHT_SEMI_JOIN ||
                                        JoinOpType::value == 
TJoinOp::LEFT_ANTI_JOIN ||
+                                       JoinOpType::value == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
                                        JoinOpType::value == 
TJoinOp::LEFT_SEMI_JOIN;
 
     constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
@@ -380,7 +390,8 @@ Status ProcessHashTableProbe<JoinOpType, 
ignore_null>::do_process(HashTableType&
                 key_getter.template prefetch<true>(hash_table_ctx.hash_table,
                                                    probe_index + 
PREFETCH_STEP, _arena);
 
-            if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
+            if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
+                          JoinOpType::value == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
                 if (!find_result.is_found()) {
                     ++current_offset;
                 }
@@ -575,7 +586,8 @@ Status ProcessHashTableProbe<JoinOpType, 
ignore_null>::do_process_with_other_joi
                 }
             } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN 
||
                                  JoinOpType::value == TJoinOp::FULL_OUTER_JOIN 
||
-                                 JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) 
{
+                                 JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN 
||
+                                 JoinOpType::value == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
                 same_to_prev.emplace_back(false);
                 visited_map.emplace_back(nullptr);
                 // only full outer / left outer need insert the data of right 
table
@@ -682,16 +694,23 @@ Status ProcessHashTableProbe<JoinOpType, 
ignore_null>::do_process_with_other_joi
 
                 output_block->get_by_position(result_column_id).column =
                         std::move(new_filter_column);
-            } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) 
{
+            } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN 
||
+                                 JoinOpType::value == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
                 auto new_filter_column = ColumnVector<UInt8>::create();
                 auto& filter_map = new_filter_column->get_data();
 
                 if (!column->empty()) {
+                    // Both equal conjuncts and other conjuncts are true
                     filter_map.emplace_back(column->get_bool(0) && 
visited_map[0]);
                 }
                 for (int i = 1; i < column->size(); ++i) {
                     if ((visited_map[i] && column->get_bool(i)) ||
                         (same_to_prev[i] && filter_map[i - 1])) {
+                        // When either of two conditions is meet:
+                        // 1. Both equal conjuncts and other conjuncts are 
true or same_to_prev
+                        // 2. This row is joined from the same build side row 
as the previous row
+                        // Set filter_map[i] to true and filter_map[i - 1] to 
false if same_to_prev[i]
+                        // is true.
                         filter_map.push_back(true);
                         filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 
1];
                     } else {
@@ -731,8 +750,10 @@ Status ProcessHashTableProbe<JoinOpType, 
ignore_null>::do_process_with_other_joi
                 output_block->clear();
             } else {
                 if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN ||
-                              JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN)
+                              JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
+                              JoinOpType::value == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
                     orig_columns = right_col_idx;
+                }
                 Block::filter_block(output_block, result_column_id, 
orig_columns);
             }
         }
@@ -828,14 +849,16 @@ Status ProcessHashTableProbe<JoinOpType, 
ignore_null>::process_data_in_hashtable
 HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : ExecNode(pool, tnode, descs),
           _join_op(tnode.hash_join_node.join_op),
-          _hash_table_rows(0),
           _mem_used(0),
+          
_have_other_join_conjunct(tnode.hash_join_node.__isset.vother_join_conjunct),
           _match_all_probe(_join_op == TJoinOp::LEFT_OUTER_JOIN ||
                            _join_op == TJoinOp::FULL_OUTER_JOIN),
-          _match_one_build(_join_op == TJoinOp::LEFT_SEMI_JOIN),
           _match_all_build(_join_op == TJoinOp::RIGHT_OUTER_JOIN ||
                            _join_op == TJoinOp::FULL_OUTER_JOIN),
-          _build_unique(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == 
TJoinOp::LEFT_SEMI_JOIN),
+          _build_unique(!_have_other_join_conjunct &&
+                        (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+                         _join_op == TJoinOp::LEFT_ANTI_JOIN ||
+                         _join_op == TJoinOp::LEFT_SEMI_JOIN)),
           _is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN ||
                               _join_op == TJoinOp::RIGHT_SEMI_JOIN),
           _is_outer_join(_match_all_build || _match_all_probe),
@@ -874,17 +897,17 @@ void HashJoinNode::init_join_op() {
 Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
     DCHECK(tnode.__isset.hash_join_node);
-    if (tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-        return Status::InternalError("Do not support null aware left anti 
join");
-    }
 
     const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN ||
                                    _join_op == TJoinOp::FULL_OUTER_JOIN ||
                                    _join_op == TJoinOp::RIGHT_ANTI_JOIN;
     const bool probe_dispose_null =
-            _match_all_probe || _build_unique || _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
+            _match_all_probe || _build_unique || _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+            _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == 
TJoinOp::LEFT_SEMI_JOIN;
 
     const std::vector<TEqJoinCondition>& eq_join_conjuncts = 
tnode.hash_join_node.eq_join_conjuncts;
+    std::vector<bool> probe_not_ignore_null(eq_join_conjuncts.size());
+    size_t conjuncts_index = 0;
     for (const auto& eq_join_conjunct : eq_join_conjuncts) {
         VExprContext* ctx = nullptr;
         RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.left, 
&ctx));
@@ -897,17 +920,20 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
         _is_null_safe_eq_join.push_back(null_aware);
 
         // if is null aware, build join column and probe join column both need 
dispose null value
-        _build_not_ignore_null.emplace_back(
+        _store_null_in_hash_table.emplace_back(
                 null_aware ||
                 (_build_expr_ctxs.back()->root()->is_nullable() && 
build_stores_null));
-        _probe_not_ignore_null.emplace_back(
+        probe_not_ignore_null[conjuncts_index] =
                 null_aware ||
-                (_probe_expr_ctxs.back()->root()->is_nullable() && 
probe_dispose_null));
-        _build_side_ignore_null |= !_build_not_ignore_null.back();
+                (_probe_expr_ctxs.back()->root()->is_nullable() && 
probe_dispose_null);
+        _build_side_ignore_null |= (_join_op != 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
+                                    !_store_null_in_hash_table.back());
+        conjuncts_index++;
     }
     for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
-        _probe_ignore_null |= !_probe_not_ignore_null[i];
+        _probe_ignore_null |= !probe_not_ignore_null[i];
     }
+    _short_circuit_for_null_in_build_side = _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
 
     _probe_column_disguise_null.reserve(eq_join_conjuncts.size());
 
@@ -918,8 +944,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
         // If LEFT SEMI JOIN/LEFT ANTI JOIN with not equal predicate,
         // build table should not be deduplicated.
-        _build_unique = false;
-        _have_other_join_conjunct = true;
+        DCHECK(!_build_unique);
+        DCHECK(_have_other_join_conjunct);
     }
 
     const auto& output_exprs = tnode.hash_join_node.srcExprList;
@@ -1057,6 +1083,12 @@ Status HashJoinNode::get_next(RuntimeState* state, 
Block* output_block, bool* eo
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_TIMER(_probe_timer);
 
+    if (_short_circuit_for_null_in_probe_side) {
+        // If we use a short-circuit strategy for null value in build side 
(e.g. if join operator is
+        // NULL_AWARE_LEFT_ANTI_JOIN), we should return empty block directly.
+        *eos = true;
+        return Status::OK();
+    }
     size_t probe_rows = _probe_block.rows();
     if ((probe_rows == 0 || _probe_index == probe_rows) && !_probe_eos) {
         _probe_index = 0;
@@ -1285,7 +1317,9 @@ Status HashJoinNode::_hash_table_build(RuntimeState* 
state) {
     constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
 
     Block block;
-    while (!eos) {
+    // If eos or have already met a null value using short-circuit strategy, 
we do not need to pull
+    // data from data.
+    while (!eos && !_short_circuit_for_null_in_probe_side) {
         block.clear_column_data();
         RETURN_IF_CANCELLED(state);
 
@@ -1315,7 +1349,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState* 
state) {
         }
     }
 
-    if (!mutable_block.empty()) {
+    if (!mutable_block.empty() && !_short_circuit_for_null_in_probe_side) {
         if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) {
             return Status::NotSupported(
                     strings::Substitute("data size of right table in hash join 
> $0",
@@ -1356,7 +1390,7 @@ Status HashJoinNode::_extract_join_column(Block& block, 
ColumnUInt8::MutablePtr&
                     DCHECK(null_map != nullptr);
                     VectorizedUtils::update_null_map(null_map->get_data(), 
col_nullmap);
                 }
-                if (_build_not_ignore_null[i]) {
+                if (_store_null_in_hash_table[i]) {
                     raw_ptrs[i] = nullable;
                 } else {
                     if constexpr (BuildSide) {
@@ -1400,7 +1434,7 @@ bool HashJoinNode::_need_null_map(Block& block, const 
std::vector<int>& res_col_
             auto column = block.get_by_position(res_col_ids[i]).column.get();
             if constexpr (BuildSide) {
                 if (check_and_get_column<ColumnNullable>(*column)) {
-                    if (!_build_not_ignore_null[i]) {
+                    if (!_store_null_in_hash_table[i]) {
                         return true;
                     }
                 }
@@ -1434,7 +1468,8 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block, uin
     //  so we have to initialize this flag by the first build block.
     if (!_has_set_need_null_map_for_build) {
         _has_set_need_null_map_for_build = true;
-        _need_null_map_for_build = _need_null_map<true>(block, res_col_ids);
+        _need_null_map_for_build =
+                _short_circuit_for_null_in_build_side || 
_need_null_map<true>(block, res_col_ids);
     }
     if (_need_null_map_for_build) {
         null_map_val = ColumnUInt8::create();
@@ -1458,21 +1493,24 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block, uin
 
     std::visit(
             [&](auto&& arg, auto has_null_value, auto build_unique, auto 
has_runtime_filter_value,
-                auto need_null_map_for_build) {
+                auto need_null_map_for_build, auto 
short_circuit_for_null_in_build_side) {
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     ProcessHashTableBuild<HashTableCtxType> 
hash_table_build_process(
                             rows, block, raw_ptrs, this, state->batch_size(), 
offset);
                     hash_table_build_process.template 
run<need_null_map_for_build, has_null_value,
-                                                          build_unique, 
has_runtime_filter_value>(
-                            arg, need_null_map_for_build ? 
&null_map_val->get_data() : nullptr);
+                                                          build_unique, 
has_runtime_filter_value,
+                                                          
short_circuit_for_null_in_build_side>(
+                            arg, need_null_map_for_build ? 
&null_map_val->get_data() : nullptr,
+                            &_short_circuit_for_null_in_probe_side);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
             },
             _hash_table_variants, make_bool_variant(_build_side_ignore_null),
             make_bool_variant(_build_unique), 
make_bool_variant(has_runtime_filter),
-            make_bool_variant(_need_null_map_for_build));
+            make_bool_variant(_need_null_map_for_build),
+            make_bool_variant(_short_circuit_for_null_in_build_side));
 
     return st;
 }
@@ -1488,7 +1526,7 @@ void HashJoinNode::_hash_table_init() {
                                                    JoinOpType::value == 
TJoinOp::RIGHT_OUTER_JOIN ||
                                                    JoinOpType::value == 
TJoinOp::FULL_OUTER_JOIN,
                                            RowRefListWithFlag, RowRefList>>;
-                if (_build_expr_ctxs.size() == 1 && 
!_build_not_ignore_null[0]) {
+                if (_build_expr_ctxs.size() == 1 && 
!_store_null_in_hash_table[0]) {
                     // Single column optimization
                     switch (_build_expr_ctxs[0]->root()->result_type()) {
                     case TYPE_BOOLEAN:
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 5f84211110..7d29268b39 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -288,7 +288,6 @@ public:
     Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) 
override;
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
     Status close(RuntimeState* state) override;
-    HashTableVariants& get_hash_table_variants() { return 
_hash_table_variants; }
     void init_join_op();
 
     const RowDescriptor& row_desc() const override { return _output_row_desc; }
@@ -311,10 +310,8 @@ private:
     // mark the join column whether support null eq
     std::vector<bool> _is_null_safe_eq_join;
 
-    // mark the build hash table whether contain null column
-    std::vector<bool> _build_not_ignore_null;
-    // mark the probe table should dispose null column
-    std::vector<bool> _probe_not_ignore_null;
+    // mark the build hash table whether it needs to store null value
+    std::vector<bool> _store_null_in_hash_table;
 
     std::vector<uint16_t> _probe_column_disguise_null;
     std::vector<uint16_t> _probe_column_convert_to_null;
@@ -343,7 +340,6 @@ private:
 
     RuntimeProfile::Counter* _join_filter_timer;
 
-    int64_t _hash_table_rows;
     int64_t _mem_used;
 
     Arena _arena;
@@ -368,14 +364,20 @@ private:
     Sizes _probe_key_sz;
     Sizes _build_key_sz;
 
+    bool _have_other_join_conjunct;
     const bool _match_all_probe; // output all rows coming from the probe 
input. Full/Left Join
-    const bool _match_one_build; // match at most one build row to each probe 
row. Left semi Join
     const bool _match_all_build; // output all rows coming from the build 
input. Full/Right Join
     bool _build_unique;          // build a hash table without duplicated 
rows. Left semi/anti Join
 
     const bool _is_right_semi_anti;
     const bool _is_outer_join;
-    bool _have_other_join_conjunct = false;
+
+    // For null aware left anti join, we apply a short circuit strategy.
+    // 1. Set _short_circuit_for_null_in_build_side to true if join operator 
is null aware left anti join.
+    // 2. In build phase, we stop building hash table when we meet the first 
null value and set _short_circuit_for_null_in_probe_side to true.
+    // 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, 
join node returns empty block directly. Otherwise, probing will continue as the 
same as generic left anti join.
+    bool _short_circuit_for_null_in_build_side = false;
+    bool _short_circuit_for_null_in_probe_side = false;
 
     Block _join_block;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
index 6db551e76d..a01739a78e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
@@ -61,7 +61,8 @@ public enum JoinOperator {
     }
 
     public boolean isSemiAntiJoin() {
-        return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == 
LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
+        return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == 
LEFT_ANTI_JOIN
+                || this == NULL_AWARE_LEFT_ANTI_JOIN || this == 
RIGHT_ANTI_JOIN;
     }
 
     public boolean isSemiJoin() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
index 5cdb4bba36..06416c162b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.TableAliasGenerator;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.policy.RowPolicy;
 import org.apache.doris.qe.ConnectContext;
 
@@ -755,8 +756,8 @@ public class StmtRewriter {
             // For the case of a NOT IN with an eq join conjunct, replace the 
join
             // conjunct with a conjunct that uses the null-matching eq 
operator.
             if (expr instanceof InPredicate) {
-                // joinOp = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
-                joinOp = JoinOperator.LEFT_ANTI_JOIN;
+                joinOp = VectorizedUtil.isVectorized()
+                        ? JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN : 
JoinOperator.LEFT_ANTI_JOIN;
                 List<TupleId> tIds = Lists.newArrayList();
                 joinConjunct.getIds(tIds, null);
                 if (tIds.size() <= 1 || 
!tIds.contains(inlineView.getDesc().getId())) {
@@ -804,7 +805,8 @@ public class StmtRewriter {
             for (int j = 0; j < tableIdx; ++j) {
                 TableRef tableRef = stmt.fromClause.get(j);
                 if (tableRef.getJoinOp() == JoinOperator.LEFT_SEMI_JOIN
-                        || tableRef.getJoinOp() == 
JoinOperator.LEFT_ANTI_JOIN) {
+                        || tableRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN
+                        || tableRef.getJoinOp() == 
JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
                     continue;
                 }
                 
newItems.add(SelectListItem.createStarItem(tableRef.getAliasAsName()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 6a8099c719..0aa5d59291 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -640,6 +640,8 @@ public class TableRef implements ParseNode, Writable {
                 return "FULL OUTER JOIN";
             case CROSS_JOIN:
                 return "CROSS JOIN";
+            case NULL_AWARE_LEFT_ANTI_JOIN:
+                return "NULL AWARE LEFT ANTI JOIN";
             default:
                 return "bad join op: " + joinOp.toString();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
index b9e3d909c1..46eebdf221 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
@@ -37,6 +37,7 @@ public enum JoinType {
     LEFT_ANTI_JOIN,
     RIGHT_ANTI_JOIN,
     CROSS_JOIN,
+    NULL_AWARE_LEFT_ANTI_JOIN,
     ;
 
     private static final Map<JoinType, JoinType> joinSwapMap = ImmutableMap
@@ -71,6 +72,8 @@ public enum JoinType {
                 return JoinOperator.FULL_OUTER_JOIN;
             case LEFT_ANTI_JOIN:
                 return JoinOperator.LEFT_ANTI_JOIN;
+            case NULL_AWARE_LEFT_ANTI_JOIN:
+                return JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
             case RIGHT_ANTI_JOIN:
                 return JoinOperator.RIGHT_ANTI_JOIN;
             case LEFT_SEMI_JOIN:
@@ -97,7 +100,8 @@ public enum JoinType {
     }
 
     public final boolean isLeftJoin() {
-        return this == LEFT_OUTER_JOIN || this == LEFT_ANTI_JOIN || this == 
LEFT_SEMI_JOIN;
+        return this == LEFT_OUTER_JOIN || this == LEFT_ANTI_JOIN || this == 
NULL_AWARE_LEFT_ANTI_JOIN
+                || this == LEFT_SEMI_JOIN;
     }
 
     public final boolean isRightJoin() {
@@ -117,7 +121,7 @@ public enum JoinType {
     }
 
     public final boolean isLeftSemiOrAntiJoin() {
-        return this == LEFT_SEMI_JOIN || this == LEFT_ANTI_JOIN;
+        return this == LEFT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this == 
NULL_AWARE_LEFT_ANTI_JOIN;
     }
 
     public final boolean isRightSemiOrAntiJoin() {
@@ -125,7 +129,8 @@ public enum JoinType {
     }
 
     public final boolean isSemiOrAntiJoin() {
-        return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == 
LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
+        return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == 
LEFT_ANTI_JOIN
+                || this == NULL_AWARE_LEFT_ANTI_JOIN || this == 
RIGHT_ANTI_JOIN;
     }
 
     public final boolean isOuterJoin() {
@@ -137,7 +142,7 @@ public enum JoinType {
     }
 
     public final boolean isRemainRightJoin() {
-        return this != LEFT_SEMI_JOIN && this != LEFT_ANTI_JOIN;
+        return this != LEFT_SEMI_JOIN && this != LEFT_ANTI_JOIN && this != 
NULL_AWARE_LEFT_ANTI_JOIN;
     }
 
     public final boolean isSwapJoinType() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java
index 1ff492d0b6..7d3a8cde41 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java
@@ -27,4 +27,6 @@ public class DistributedPlanColocateRule {
     public static final String COLOCATE_GROUP_IS_NOT_STABLE = "Colocate group 
is not stable";
     public static final String INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY
             = "Inconsistent distribution of table and queries";
+    public static final String NULL_AWARE_LEFT_ANTI_JOIN_MUST_BROADCAST
+            = "Build side of null aware left anti join must be broadcast";
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 19e343cc72..d005a6ff3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -353,7 +353,10 @@ public class DistributedPlanner {
         // - and the expected size of the hash tbl doesn't exceed 
autoBroadcastThreshold
         // we set partition join as default when broadcast join cost equals 
partition join cost
 
-        if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN && 
node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) {
+        if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+            doBroadcast = true;
+        } else if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
+                && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) {
             if (node.getInnerRef().isBroadcastJoin()) {
                 // respect user join hint
                 doBroadcast = true;
@@ -425,26 +428,33 @@ public class DistributedPlanner {
 
     /**
      * Colocate Join can be performed when the following 4 conditions are met 
at the same time.
-     * 1. Session variables disable_colocate_plan = false
-     * 2. There is no join hints in HashJoinNode
-     * 3. There are no exchange node between source scan node and HashJoinNode.
-     * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are 
colocate and group can be matched.
+     * 1. Join operator is not NULL_AWARE_LEFT_ANTI_JOIN
+     * 2. Session variables disable_colocate_plan = false
+     * 3. There is no join hints in HashJoinNode
+     * 4. There are no exchange node between source scan node and HashJoinNode.
+     * 5. The scan nodes which are related by EqConjuncts in HashJoinNode are 
colocate and group can be matched.
      */
     private boolean canColocateJoin(HashJoinNode node, PlanFragment 
leftChildFragment, PlanFragment rightChildFragment,
                                     List<String> cannotReason) {
         // Condition1
+        if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+            
cannotReason.add(DistributedPlanColocateRule.NULL_AWARE_LEFT_ANTI_JOIN_MUST_BROADCAST);
+            return false;
+        }
+
+        // Condition2
         if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) 
{
             cannotReason.add(DistributedPlanColocateRule.SESSION_DISABLED);
             return false;
         }
 
-        // Condition2: If user have a join hint to use proper way of join, can 
not be colocate join
+        // Condition3: If user have a join hint to use proper way of join, can 
not be colocate join
         if (node.getInnerRef().hasJoinHints()) {
             cannotReason.add(DistributedPlanColocateRule.HAS_JOIN_HINT);
             return false;
         }
 
-        // Condition3:
+        // Condition4:
         // If there is an exchange node between the HashJoinNode and their 
real associated ScanNode,
         //   it means that the data has been rehashed.
         // The rehashed data can no longer be guaranteed to correspond to the 
left and right buckets,
@@ -468,7 +478,7 @@ public class DistributedPlanner {
             predicateList.add(eqJoinPredicate);
         }
 
-        // Condition4
+        // Condition5
         return dataDistributionMatchEqPredicate(scanNodeWithJoinConjuncts, 
cannotReason);
     }
 
@@ -581,6 +591,10 @@ public class DistributedPlanner {
 
     private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment 
leftChildFragment,
                                          List<Expr> rhsHashExprs) {
+        if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+            return false;
+        }
+
         if 
(!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
             return false;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
index 98d4a86cf7..b926c5147b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
@@ -222,7 +222,8 @@ public final class RuntimeFilterGenerator {
             // from the ON clause.
             if (!joinNode.getJoinOp().isLeftOuterJoin()
                     && !joinNode.getJoinOp().isFullOuterJoin()
-                    && 
!joinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN)) {
+                    && 
!joinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN)
+                    && 
!joinNode.getJoinOp().equals(JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)) {
                 joinConjuncts.addAll(joinNode.getEqJoinConjuncts());
             }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java
index b0d82ddbaf..2154389c6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java
@@ -74,7 +74,9 @@ public class ExprRewriter {
                 case FULL_OUTER_JOIN: return FULL_OUTER_JOIN_CLAUSE;
                 case LEFT_SEMI_JOIN: return LEFT_SEMI_JOIN_CLAUSE;
                 case RIGHT_SEMI_JOIN: return RIGHT_SEMI_JOIN_CLAUSE;
-                case LEFT_ANTI_JOIN: return LEFT_ANTI_JOIN_CLAUSE;
+                case NULL_AWARE_LEFT_ANTI_JOIN:
+                case LEFT_ANTI_JOIN:
+                    return LEFT_ANTI_JOIN_CLAUSE;
                 case RIGHT_ANTI_JOIN: return RIGHT_ANTI_JOIN_CLAUSE;
                 case CROSS_JOIN: return CROSS_JOIN_CLAUSE;
                 default: return OTHER_CLAUSE;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java 
b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
index 1574fd4c86..1b92475a37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
@@ -457,7 +457,8 @@ public class InferFiltersRule implements ExprRewriteRule {
                     || (joinOperator == JoinOperator.LEFT_SEMI_JOIN)
                     || (!needChange && joinOperator == 
JoinOperator.RIGHT_OUTER_JOIN)
                     || (needChange && (joinOperator == 
JoinOperator.LEFT_OUTER_JOIN
-                    || joinOperator == JoinOperator.LEFT_ANTI_JOIN))) {
+                    || joinOperator == JoinOperator.LEFT_ANTI_JOIN
+                    || joinOperator == 
JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN))) {
                 ret = true;
             }
         } else if (clauseType == ExprRewriter.ClauseType.WHERE_CLAUSE) {
@@ -465,7 +466,8 @@ public class InferFiltersRule implements ExprRewriteRule {
                     || (joinOperator == JoinOperator.LEFT_SEMI_JOIN
                     || (needChange && joinOperator == 
JoinOperator.RIGHT_OUTER_JOIN))
                     || (!needChange && (joinOperator == 
JoinOperator.LEFT_OUTER_JOIN
-                    || joinOperator == JoinOperator.LEFT_ANTI_JOIN))) {
+                    || joinOperator == JoinOperator.LEFT_ANTI_JOIN
+                    || joinOperator == 
JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN))) {
                 ret = true;
             }
         }
diff --git 
a/regression-test/data/correctness_p0/test_null_aware_left_anti_join.out 
b/regression-test/data/correctness_p0/test_null_aware_left_anti_join.out
new file mode 100644
index 0000000000..d149258eda
--- /dev/null
+++ b/regression-test/data/correctness_p0/test_null_aware_left_anti_join.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+2
+
+-- !select --
+\N
+2
+
+-- !select --
+
diff --git 
a/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy 
b/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy
new file mode 100644
index 0000000000..b25e992cad
--- /dev/null
+++ 
b/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_null_aware_left_anti_join") {
+    def tableName1 = "test_null_aware_left_anti_join1"
+    def tableName2 = "test_null_aware_left_anti_join2"
+    sql """
+        drop table if exists ${tableName1};
+    """
+
+    sql """
+        drop table if exists ${tableName2};
+    """
+
+    sql """
+        create table if not exists ${tableName1} ( `k1` int(11) NULL ) 
DISTRIBUTED BY HASH(`k1`) BUCKETS 4         PROPERTIES (         
"replication_num" = "1");
+    """
+
+    sql """
+        create table if not exists ${tableName2} ( `k1` int(11) NULL ) 
DISTRIBUTED BY HASH(`k1`) BUCKETS 4         PROPERTIES (         
"replication_num" = "1");
+    """
+
+    sql """
+        insert into ${tableName1} values (1), (3);
+    """
+
+    sql """
+        insert into ${tableName2} values (1), (2);
+    """
+
+    qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in 
(select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
+
+    sql """
+        insert into ${tableName2} values(null);
+    """
+
+    qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in 
(select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
+
+    sql """
+        insert into ${tableName1} values(null);
+    """
+
+    qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in 
(select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
+
+    sql """
+        drop table if exists ${tableName2};
+    """
+
+    sql """
+        drop table if exists ${tableName1};
+    """
+}
diff --git 
a/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy 
b/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy
index 0314b123ba..b50a8780c4 100644
--- a/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy
+++ b/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy
@@ -67,9 +67,9 @@ suite("test_explain_tpch_sf_1_q16") {
                explainStr.contains("VAGGREGATE (update serialize)\n" + 
                                "  |  STREAMING\n" + 
                                "  |  group by: <slot 29>, <slot 30>, <slot 
31>, <slot 27>") && 
-               explainStr.contains("join op: LEFT ANTI JOIN(BROADCAST)[The src 
data has been redistributed]\n" + 
+               explainStr.contains("join op: NULL AWARE LEFT ANTI 
JOIN(BROADCAST)[Build side of null aware left anti join must be broadcast]\n" +
                                "  |  equal join conjunct: <slot 21> = 
`s_suppkey`") && 
-               explainStr.contains("vec output tuple id: 8") && 
+               explainStr.contains("vec output tuple id: 8") &&
                explainStr.contains("output slot ids: 27 29 30 31 \n" + 
                                "  |  hash output slot ids: 21 23 24 25 ") && 
                explainStr.contains("join op: INNER JOIN(BROADCAST)[Tables are 
not in the same group]\n" + 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to