This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bfba058ecf [Feature](join) Support null aware left anti join (#13871) bfba058ecf is described below commit bfba058ecf4b3c41ee4d7f5eced22bf9ab8f74a5 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Nov 3 12:11:25 2022 +0800 [Feature](join) Support null aware left anti join (#13871) --- be/src/vec/exec/join/vhash_join_node.cpp | 98 +++++++++++++++------- be/src/vec/exec/join/vhash_join_node.h | 18 ++-- .../org/apache/doris/analysis/JoinOperator.java | 3 +- .../org/apache/doris/analysis/StmtRewriter.java | 8 +- .../java/org/apache/doris/analysis/TableRef.java | 2 + .../apache/doris/nereids/trees/plans/JoinType.java | 13 ++- .../doris/planner/DistributedPlanColocateRule.java | 2 + .../apache/doris/planner/DistributedPlanner.java | 30 +++++-- .../doris/planner/RuntimeFilterGenerator.java | 3 +- .../org/apache/doris/rewrite/ExprRewriter.java | 4 +- .../org/apache/doris/rewrite/InferFiltersRule.java | 6 +- .../test_null_aware_left_anti_join.out | 10 +++ .../test_null_aware_left_anti_join.groovy | 66 +++++++++++++++ .../tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy | 4 +- 14 files changed, 207 insertions(+), 60 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index eb419eb52f..a478df8744 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -59,8 +59,8 @@ struct ProcessHashTableBuild { _build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {} template <bool need_null_map_for_build, bool ignore_null, bool build_unique, - bool has_runtime_filter> - void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map) { + bool has_runtime_filter, bool short_circuit_for_null> + void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; int64_t old_bucket_bytes = hash_table_ctx.hash_table.get_buffer_size_in_bytes(); @@ -97,6 +97,15 @@ struct ProcessHashTableBuild { continue; } } + // If apply short circuit strategy for null value (e.g. join operator is + // NULL_AWARE_LEFT_ANTI_JOIN), we build hash table until we meet a null value. + if constexpr (short_circuit_for_null && need_null_map_for_build) { + if ((*null_map)[k]) { + DCHECK(has_null_key); + *has_null_key = true; + return; + } + } if constexpr (IsSerializedHashTableContextTraits<KeyGetter>::value) { _build_side_hash_values[k] = hash_table_ctx.hash_table.hash(key_getter.get_key_holder(k, arena).key); @@ -218,6 +227,7 @@ void ProcessHashTableProbe<JoinOpType, ignore_null>::build_side_output_column( constexpr auto is_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN; constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || @@ -380,7 +390,8 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType& key_getter.template prefetch<true>(hash_table_ctx.hash_table, probe_index + PREFETCH_STEP, _arena); - if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { + if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { if (!find_result.is_found()) { ++current_offset; } @@ -575,7 +586,8 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_joi } } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN || - JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { + JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { same_to_prev.emplace_back(false); visited_map.emplace_back(nullptr); // only full outer / left outer need insert the data of right table @@ -682,16 +694,23 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_joi output_block->get_by_position(result_column_id).column = std::move(new_filter_column); - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { + } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { auto new_filter_column = ColumnVector<UInt8>::create(); auto& filter_map = new_filter_column->get_data(); if (!column->empty()) { + // Both equal conjuncts and other conjuncts are true filter_map.emplace_back(column->get_bool(0) && visited_map[0]); } for (int i = 1; i < column->size(); ++i) { if ((visited_map[i] && column->get_bool(i)) || (same_to_prev[i] && filter_map[i - 1])) { + // When either of two conditions is meet: + // 1. Both equal conjuncts and other conjuncts are true or same_to_prev + // 2. This row is joined from the same build side row as the previous row + // Set filter_map[i] to true and filter_map[i - 1] to false if same_to_prev[i] + // is true. filter_map.push_back(true); filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; } else { @@ -731,8 +750,10 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_joi output_block->clear(); } else { if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN || - JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) + JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { orig_columns = right_col_idx; + } Block::filter_block(output_block, result_column_id, orig_columns); } } @@ -828,14 +849,16 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::process_data_in_hashtable HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _join_op(tnode.hash_join_node.join_op), - _hash_table_rows(0), _mem_used(0), + _have_other_join_conjunct(tnode.hash_join_node.__isset.vother_join_conjunct), _match_all_probe(_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN), - _match_one_build(_join_op == TJoinOp::LEFT_SEMI_JOIN), _match_all_build(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN), - _build_unique(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN), + _build_unique(!_have_other_join_conjunct && + (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + _join_op == TJoinOp::LEFT_ANTI_JOIN || + _join_op == TJoinOp::LEFT_SEMI_JOIN)), _is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN), _is_outer_join(_match_all_build || _match_all_probe), @@ -874,17 +897,17 @@ void HashJoinNode::init_join_op() { Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); - if (tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - return Status::InternalError("Do not support null aware left anti join"); - } const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN; const bool probe_dispose_null = - _match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; + _match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN; const std::vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts; + std::vector<bool> probe_not_ignore_null(eq_join_conjuncts.size()); + size_t conjuncts_index = 0; for (const auto& eq_join_conjunct : eq_join_conjuncts) { VExprContext* ctx = nullptr; RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.left, &ctx)); @@ -897,17 +920,20 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _is_null_safe_eq_join.push_back(null_aware); // if is null aware, build join column and probe join column both need dispose null value - _build_not_ignore_null.emplace_back( + _store_null_in_hash_table.emplace_back( null_aware || (_build_expr_ctxs.back()->root()->is_nullable() && build_stores_null)); - _probe_not_ignore_null.emplace_back( + probe_not_ignore_null[conjuncts_index] = null_aware || - (_probe_expr_ctxs.back()->root()->is_nullable() && probe_dispose_null)); - _build_side_ignore_null |= !_build_not_ignore_null.back(); + (_probe_expr_ctxs.back()->root()->is_nullable() && probe_dispose_null); + _build_side_ignore_null |= (_join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && + !_store_null_in_hash_table.back()); + conjuncts_index++; } for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { - _probe_ignore_null |= !_probe_not_ignore_null[i]; + _probe_ignore_null |= !probe_not_ignore_null[i]; } + _short_circuit_for_null_in_build_side = _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; _probe_column_disguise_null.reserve(eq_join_conjuncts.size()); @@ -918,8 +944,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { // If LEFT SEMI JOIN/LEFT ANTI JOIN with not equal predicate, // build table should not be deduplicated. - _build_unique = false; - _have_other_join_conjunct = true; + DCHECK(!_build_unique); + DCHECK(_have_other_join_conjunct); } const auto& output_exprs = tnode.hash_join_node.srcExprList; @@ -1057,6 +1083,12 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_TIMER(_probe_timer); + if (_short_circuit_for_null_in_probe_side) { + // If we use a short-circuit strategy for null value in build side (e.g. if join operator is + // NULL_AWARE_LEFT_ANTI_JOIN), we should return empty block directly. + *eos = true; + return Status::OK(); + } size_t probe_rows = _probe_block.rows(); if ((probe_rows == 0 || _probe_index == probe_rows) && !_probe_eos) { _probe_index = 0; @@ -1285,7 +1317,9 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; Block block; - while (!eos) { + // If eos or have already met a null value using short-circuit strategy, we do not need to pull + // data from data. + while (!eos && !_short_circuit_for_null_in_probe_side) { block.clear_column_data(); RETURN_IF_CANCELLED(state); @@ -1315,7 +1349,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { } } - if (!mutable_block.empty()) { + if (!mutable_block.empty() && !_short_circuit_for_null_in_probe_side) { if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) { return Status::NotSupported( strings::Substitute("data size of right table in hash join > $0", @@ -1356,7 +1390,7 @@ Status HashJoinNode::_extract_join_column(Block& block, ColumnUInt8::MutablePtr& DCHECK(null_map != nullptr); VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap); } - if (_build_not_ignore_null[i]) { + if (_store_null_in_hash_table[i]) { raw_ptrs[i] = nullable; } else { if constexpr (BuildSide) { @@ -1400,7 +1434,7 @@ bool HashJoinNode::_need_null_map(Block& block, const std::vector<int>& res_col_ auto column = block.get_by_position(res_col_ids[i]).column.get(); if constexpr (BuildSide) { if (check_and_get_column<ColumnNullable>(*column)) { - if (!_build_not_ignore_null[i]) { + if (!_store_null_in_hash_table[i]) { return true; } } @@ -1434,7 +1468,8 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin // so we have to initialize this flag by the first build block. if (!_has_set_need_null_map_for_build) { _has_set_need_null_map_for_build = true; - _need_null_map_for_build = _need_null_map<true>(block, res_col_ids); + _need_null_map_for_build = + _short_circuit_for_null_in_build_side || _need_null_map<true>(block, res_col_ids); } if (_need_null_map_for_build) { null_map_val = ColumnUInt8::create(); @@ -1458,21 +1493,24 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin std::visit( [&](auto&& arg, auto has_null_value, auto build_unique, auto has_runtime_filter_value, - auto need_null_map_for_build) { + auto need_null_map_for_build, auto short_circuit_for_null_in_build_side) { using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { ProcessHashTableBuild<HashTableCtxType> hash_table_build_process( rows, block, raw_ptrs, this, state->batch_size(), offset); hash_table_build_process.template run<need_null_map_for_build, has_null_value, - build_unique, has_runtime_filter_value>( - arg, need_null_map_for_build ? &null_map_val->get_data() : nullptr); + build_unique, has_runtime_filter_value, + short_circuit_for_null_in_build_side>( + arg, need_null_map_for_build ? &null_map_val->get_data() : nullptr, + &_short_circuit_for_null_in_probe_side); } else { LOG(FATAL) << "FATAL: uninited hash table"; } }, _hash_table_variants, make_bool_variant(_build_side_ignore_null), make_bool_variant(_build_unique), make_bool_variant(has_runtime_filter), - make_bool_variant(_need_null_map_for_build)); + make_bool_variant(_need_null_map_for_build), + make_bool_variant(_short_circuit_for_null_in_build_side)); return st; } @@ -1488,7 +1526,7 @@ void HashJoinNode::_hash_table_init() { JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN, RowRefListWithFlag, RowRefList>>; - if (_build_expr_ctxs.size() == 1 && !_build_not_ignore_null[0]) { + if (_build_expr_ctxs.size() == 1 && !_store_null_in_hash_table[0]) { // Single column optimization switch (_build_expr_ctxs[0]->root()->result_type()) { case TYPE_BOOLEAN: diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 5f84211110..7d29268b39 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -288,7 +288,6 @@ public: Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; Status get_next(RuntimeState* state, Block* block, bool* eos) override; Status close(RuntimeState* state) override; - HashTableVariants& get_hash_table_variants() { return _hash_table_variants; } void init_join_op(); const RowDescriptor& row_desc() const override { return _output_row_desc; } @@ -311,10 +310,8 @@ private: // mark the join column whether support null eq std::vector<bool> _is_null_safe_eq_join; - // mark the build hash table whether contain null column - std::vector<bool> _build_not_ignore_null; - // mark the probe table should dispose null column - std::vector<bool> _probe_not_ignore_null; + // mark the build hash table whether it needs to store null value + std::vector<bool> _store_null_in_hash_table; std::vector<uint16_t> _probe_column_disguise_null; std::vector<uint16_t> _probe_column_convert_to_null; @@ -343,7 +340,6 @@ private: RuntimeProfile::Counter* _join_filter_timer; - int64_t _hash_table_rows; int64_t _mem_used; Arena _arena; @@ -368,14 +364,20 @@ private: Sizes _probe_key_sz; Sizes _build_key_sz; + bool _have_other_join_conjunct; const bool _match_all_probe; // output all rows coming from the probe input. Full/Left Join - const bool _match_one_build; // match at most one build row to each probe row. Left semi Join const bool _match_all_build; // output all rows coming from the build input. Full/Right Join bool _build_unique; // build a hash table without duplicated rows. Left semi/anti Join const bool _is_right_semi_anti; const bool _is_outer_join; - bool _have_other_join_conjunct = false; + + // For null aware left anti join, we apply a short circuit strategy. + // 1. Set _short_circuit_for_null_in_build_side to true if join operator is null aware left anti join. + // 2. In build phase, we stop building hash table when we meet the first null value and set _short_circuit_for_null_in_probe_side to true. + // 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join. + bool _short_circuit_for_null_in_build_side = false; + bool _short_circuit_for_null_in_probe_side = false; Block _join_block; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java index 6db551e76d..a01739a78e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java @@ -61,7 +61,8 @@ public enum JoinOperator { } public boolean isSemiAntiJoin() { - return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN; + return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN + || this == NULL_AWARE_LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN; } public boolean isSemiJoin() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java index 5cdb4bba36..06416c162b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.TableAliasGenerator; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.policy.RowPolicy; import org.apache.doris.qe.ConnectContext; @@ -755,8 +756,8 @@ public class StmtRewriter { // For the case of a NOT IN with an eq join conjunct, replace the join // conjunct with a conjunct that uses the null-matching eq operator. if (expr instanceof InPredicate) { - // joinOp = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN; - joinOp = JoinOperator.LEFT_ANTI_JOIN; + joinOp = VectorizedUtil.isVectorized() + ? JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN : JoinOperator.LEFT_ANTI_JOIN; List<TupleId> tIds = Lists.newArrayList(); joinConjunct.getIds(tIds, null); if (tIds.size() <= 1 || !tIds.contains(inlineView.getDesc().getId())) { @@ -804,7 +805,8 @@ public class StmtRewriter { for (int j = 0; j < tableIdx; ++j) { TableRef tableRef = stmt.fromClause.get(j); if (tableRef.getJoinOp() == JoinOperator.LEFT_SEMI_JOIN - || tableRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN) { + || tableRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN + || tableRef.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { continue; } newItems.add(SelectListItem.createStarItem(tableRef.getAliasAsName())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 6a8099c719..0aa5d59291 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -640,6 +640,8 @@ public class TableRef implements ParseNode, Writable { return "FULL OUTER JOIN"; case CROSS_JOIN: return "CROSS JOIN"; + case NULL_AWARE_LEFT_ANTI_JOIN: + return "NULL AWARE LEFT ANTI JOIN"; default: return "bad join op: " + joinOp.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java index b9e3d909c1..46eebdf221 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java @@ -37,6 +37,7 @@ public enum JoinType { LEFT_ANTI_JOIN, RIGHT_ANTI_JOIN, CROSS_JOIN, + NULL_AWARE_LEFT_ANTI_JOIN, ; private static final Map<JoinType, JoinType> joinSwapMap = ImmutableMap @@ -71,6 +72,8 @@ public enum JoinType { return JoinOperator.FULL_OUTER_JOIN; case LEFT_ANTI_JOIN: return JoinOperator.LEFT_ANTI_JOIN; + case NULL_AWARE_LEFT_ANTI_JOIN: + return JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN; case RIGHT_ANTI_JOIN: return JoinOperator.RIGHT_ANTI_JOIN; case LEFT_SEMI_JOIN: @@ -97,7 +100,8 @@ public enum JoinType { } public final boolean isLeftJoin() { - return this == LEFT_OUTER_JOIN || this == LEFT_ANTI_JOIN || this == LEFT_SEMI_JOIN; + return this == LEFT_OUTER_JOIN || this == LEFT_ANTI_JOIN || this == NULL_AWARE_LEFT_ANTI_JOIN + || this == LEFT_SEMI_JOIN; } public final boolean isRightJoin() { @@ -117,7 +121,7 @@ public enum JoinType { } public final boolean isLeftSemiOrAntiJoin() { - return this == LEFT_SEMI_JOIN || this == LEFT_ANTI_JOIN; + return this == LEFT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this == NULL_AWARE_LEFT_ANTI_JOIN; } public final boolean isRightSemiOrAntiJoin() { @@ -125,7 +129,8 @@ public enum JoinType { } public final boolean isSemiOrAntiJoin() { - return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN; + return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this == LEFT_ANTI_JOIN + || this == NULL_AWARE_LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN; } public final boolean isOuterJoin() { @@ -137,7 +142,7 @@ public enum JoinType { } public final boolean isRemainRightJoin() { - return this != LEFT_SEMI_JOIN && this != LEFT_ANTI_JOIN; + return this != LEFT_SEMI_JOIN && this != LEFT_ANTI_JOIN && this != NULL_AWARE_LEFT_ANTI_JOIN; } public final boolean isSwapJoinType() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java index 1ff492d0b6..7d3a8cde41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java @@ -27,4 +27,6 @@ public class DistributedPlanColocateRule { public static final String COLOCATE_GROUP_IS_NOT_STABLE = "Colocate group is not stable"; public static final String INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY = "Inconsistent distribution of table and queries"; + public static final String NULL_AWARE_LEFT_ANTI_JOIN_MUST_BROADCAST + = "Build side of null aware left anti join must be broadcast"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 19e343cc72..d005a6ff3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -353,7 +353,10 @@ public class DistributedPlanner { // - and the expected size of the hash tbl doesn't exceed autoBroadcastThreshold // we set partition join as default when broadcast join cost equals partition join cost - if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) { + if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { + doBroadcast = true; + } else if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN + && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) { if (node.getInnerRef().isBroadcastJoin()) { // respect user join hint doBroadcast = true; @@ -425,26 +428,33 @@ public class DistributedPlanner { /** * Colocate Join can be performed when the following 4 conditions are met at the same time. - * 1. Session variables disable_colocate_plan = false - * 2. There is no join hints in HashJoinNode - * 3. There are no exchange node between source scan node and HashJoinNode. - * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are colocate and group can be matched. + * 1. Join operator is not NULL_AWARE_LEFT_ANTI_JOIN + * 2. Session variables disable_colocate_plan = false + * 3. There is no join hints in HashJoinNode + * 4. There are no exchange node between source scan node and HashJoinNode. + * 5. The scan nodes which are related by EqConjuncts in HashJoinNode are colocate and group can be matched. */ private boolean canColocateJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment, List<String> cannotReason) { // Condition1 + if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { + cannotReason.add(DistributedPlanColocateRule.NULL_AWARE_LEFT_ANTI_JOIN_MUST_BROADCAST); + return false; + } + + // Condition2 if (ConnectContext.get().getSessionVariable().isDisableColocatePlan()) { cannotReason.add(DistributedPlanColocateRule.SESSION_DISABLED); return false; } - // Condition2: If user have a join hint to use proper way of join, can not be colocate join + // Condition3: If user have a join hint to use proper way of join, can not be colocate join if (node.getInnerRef().hasJoinHints()) { cannotReason.add(DistributedPlanColocateRule.HAS_JOIN_HINT); return false; } - // Condition3: + // Condition4: // If there is an exchange node between the HashJoinNode and their real associated ScanNode, // it means that the data has been rehashed. // The rehashed data can no longer be guaranteed to correspond to the left and right buckets, @@ -468,7 +478,7 @@ public class DistributedPlanner { predicateList.add(eqJoinPredicate); } - // Condition4 + // Condition5 return dataDistributionMatchEqPredicate(scanNodeWithJoinConjuncts, cannotReason); } @@ -581,6 +591,10 @@ public class DistributedPlanner { private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, List<Expr> rhsHashExprs) { + if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { + return false; + } + if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java index 98d4a86cf7..b926c5147b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java @@ -222,7 +222,8 @@ public final class RuntimeFilterGenerator { // from the ON clause. if (!joinNode.getJoinOp().isLeftOuterJoin() && !joinNode.getJoinOp().isFullOuterJoin() - && !joinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN)) { + && !joinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN) + && !joinNode.getJoinOp().equals(JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)) { joinConjuncts.addAll(joinNode.getEqJoinConjuncts()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java index b0d82ddbaf..2154389c6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java @@ -74,7 +74,9 @@ public class ExprRewriter { case FULL_OUTER_JOIN: return FULL_OUTER_JOIN_CLAUSE; case LEFT_SEMI_JOIN: return LEFT_SEMI_JOIN_CLAUSE; case RIGHT_SEMI_JOIN: return RIGHT_SEMI_JOIN_CLAUSE; - case LEFT_ANTI_JOIN: return LEFT_ANTI_JOIN_CLAUSE; + case NULL_AWARE_LEFT_ANTI_JOIN: + case LEFT_ANTI_JOIN: + return LEFT_ANTI_JOIN_CLAUSE; case RIGHT_ANTI_JOIN: return RIGHT_ANTI_JOIN_CLAUSE; case CROSS_JOIN: return CROSS_JOIN_CLAUSE; default: return OTHER_CLAUSE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java index 1574fd4c86..1b92475a37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java @@ -457,7 +457,8 @@ public class InferFiltersRule implements ExprRewriteRule { || (joinOperator == JoinOperator.LEFT_SEMI_JOIN) || (!needChange && joinOperator == JoinOperator.RIGHT_OUTER_JOIN) || (needChange && (joinOperator == JoinOperator.LEFT_OUTER_JOIN - || joinOperator == JoinOperator.LEFT_ANTI_JOIN))) { + || joinOperator == JoinOperator.LEFT_ANTI_JOIN + || joinOperator == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN))) { ret = true; } } else if (clauseType == ExprRewriter.ClauseType.WHERE_CLAUSE) { @@ -465,7 +466,8 @@ public class InferFiltersRule implements ExprRewriteRule { || (joinOperator == JoinOperator.LEFT_SEMI_JOIN || (needChange && joinOperator == JoinOperator.RIGHT_OUTER_JOIN)) || (!needChange && (joinOperator == JoinOperator.LEFT_OUTER_JOIN - || joinOperator == JoinOperator.LEFT_ANTI_JOIN))) { + || joinOperator == JoinOperator.LEFT_ANTI_JOIN + || joinOperator == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN))) { ret = true; } } diff --git a/regression-test/data/correctness_p0/test_null_aware_left_anti_join.out b/regression-test/data/correctness_p0/test_null_aware_left_anti_join.out new file mode 100644 index 0000000000..d149258eda --- /dev/null +++ b/regression-test/data/correctness_p0/test_null_aware_left_anti_join.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +2 + +-- !select -- +\N +2 + +-- !select -- + diff --git a/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy b/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy new file mode 100644 index 0000000000..b25e992cad --- /dev/null +++ b/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_null_aware_left_anti_join") { + def tableName1 = "test_null_aware_left_anti_join1" + def tableName2 = "test_null_aware_left_anti_join2" + sql """ + drop table if exists ${tableName1}; + """ + + sql """ + drop table if exists ${tableName2}; + """ + + sql """ + create table if not exists ${tableName1} ( `k1` int(11) NULL ) DISTRIBUTED BY HASH(`k1`) BUCKETS 4 PROPERTIES ( "replication_num" = "1"); + """ + + sql """ + create table if not exists ${tableName2} ( `k1` int(11) NULL ) DISTRIBUTED BY HASH(`k1`) BUCKETS 4 PROPERTIES ( "replication_num" = "1"); + """ + + sql """ + insert into ${tableName1} values (1), (3); + """ + + sql """ + insert into ${tableName2} values (1), (2); + """ + + qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """ + + sql """ + insert into ${tableName2} values(null); + """ + + qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """ + + sql """ + insert into ${tableName1} values(null); + """ + + qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in (select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """ + + sql """ + drop table if exists ${tableName2}; + """ + + sql """ + drop table if exists ${tableName1}; + """ +} diff --git a/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy b/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy index 0314b123ba..b50a8780c4 100644 --- a/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy +++ b/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy @@ -67,9 +67,9 @@ suite("test_explain_tpch_sf_1_q16") { explainStr.contains("VAGGREGATE (update serialize)\n" + " | STREAMING\n" + " | group by: <slot 29>, <slot 30>, <slot 31>, <slot 27>") && - explainStr.contains("join op: LEFT ANTI JOIN(BROADCAST)[The src data has been redistributed]\n" + + explainStr.contains("join op: NULL AWARE LEFT ANTI JOIN(BROADCAST)[Build side of null aware left anti join must be broadcast]\n" + " | equal join conjunct: <slot 21> = `s_suppkey`") && - explainStr.contains("vec output tuple id: 8") && + explainStr.contains("vec output tuple id: 8") && explainStr.contains("output slot ids: 27 29 30 31 \n" + " | hash output slot ids: 21 23 24 25 ") && explainStr.contains("join op: INNER JOIN(BROADCAST)[Tables are not in the same group]\n" + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org