This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new fd51474649 [feature](planner) mark join to support subquery in disjunction (#14579) (#15291) fd51474649 is described below commit fd514746494360790ecd11000deaf979ce8f759b Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Dec 23 10:56:42 2022 +0800 [feature](planner) mark join to support subquery in disjunction (#14579) (#15291) cherry-pick #14579 --- be/src/vec/exec/join/vjoin_node_base.cpp | 16 ++ be/src/vec/exec/join/vjoin_node_base.h | 1 + be/src/vec/exec/join/vnested_loop_join_node.cpp | 86 ++++++-- be/src/vec/exec/join/vnested_loop_join_node.h | 12 +- .../org/apache/doris/analysis/AnalyticExpr.java | 5 +- .../java/org/apache/doris/analysis/Analyzer.java | 46 +++- .../main/java/org/apache/doris/analysis/Expr.java | 26 ++- .../apache/doris/analysis/FunctionCallExpr.java | 5 +- .../org/apache/doris/analysis/JoinOperator.java | 8 +- .../java/org/apache/doris/analysis/SelectStmt.java | 3 + .../org/apache/doris/analysis/StmtRewriter.java | 244 ++++++++++++++------- .../java/org/apache/doris/analysis/TableRef.java | 25 ++- .../org/apache/doris/planner/JoinNodeBase.java | 12 +- .../apache/doris/planner/NestedLoopJoinNode.java | 1 + .../apache/doris/planner/SingleNodePlanner.java | 16 +- gensrc/thrift/PlanNodes.thrift | 2 + .../correctness/test_subquery_in_disjunction.out | 26 +++ .../test_subquery_in_disjunction.groovy | 83 +++++++ 18 files changed, 486 insertions(+), 131 deletions(-) diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp index 2ebedff219..9befa4f4bb 100644 --- a/be/src/vec/exec/join/vjoin_node_base.cpp +++ b/be/src/vec/exec/join/vjoin_node_base.cpp @@ -48,8 +48,19 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des _join_op == TJoinOp::LEFT_SEMI_JOIN || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN), _is_outer_join(_match_all_build || _match_all_probe), + _is_mark_join(tnode.__isset.nested_loop_join_node + ? (tnode.nested_loop_join_node.__isset.is_mark + ? tnode.nested_loop_join_node.is_mark + : false) + : false), _short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { _init_join_op(); + if (_is_mark_join) { + DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN || + _join_op == TJoinOp::CROSS_JOIN) + << "Mark join is only supported for left semi/anti join and cross join but this is " + << _join_op; + } if (tnode.__isset.hash_join_node) { _output_row_desc.reset( new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false})); @@ -83,6 +94,11 @@ void VJoinNodeBase::_construct_mutable_join_block() { _join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()}); } } + if (_is_mark_join) { + _join_block.replace_by_position( + _join_block.columns() - 1, + remove_nullable(_join_block.get_by_position(_join_block.columns() - 1).column)); + } } Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block) { diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index f2bdc6eced..5c73e0a0c6 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -87,6 +87,7 @@ protected: const bool _is_right_semi_anti; const bool _is_left_semi_anti; const bool _is_outer_join; + const bool _is_mark_join; // For null aware left anti join, we apply a short circuit strategy. // 1. Set _short_circuit_for_null_in_build_side to true if join operator is null aware left anti join. diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 2d3a954903..7c3f29c854 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -225,7 +225,6 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo _join_block.clear_column_data(); MutableBlock mutable_join_block(&_join_block); - auto& dst_columns = mutable_join_block.mutable_columns(); std::stack<uint16_t> offset_stack; RETURN_IF_ERROR(std::visit( @@ -251,7 +250,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo if constexpr (set_build_side_flag) { offset_stack.push(mutable_join_block.rows()); } - _process_left_child_block(dst_columns, now_process_build_block); + _process_left_child_block(mutable_join_block, now_process_build_block); } while (mutable_join_block.rows() < state->batch_size() && _current_build_pos < _build_blocks.size()); } @@ -261,7 +260,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo Status status = _do_filtering_and_update_visited_flags<set_build_side_flag, set_probe_side_flag>( &tmp_block, offset_stack, !_is_left_semi_anti); - _update_tuple_is_null_column(&tmp_block); + _update_additional_flags(&tmp_block); if (!status.OK()) { return status; } @@ -274,15 +273,21 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo if (!_matched_rows_done) { _finalize_current_phase<false, JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN>( - dst_columns, state->batch_size()); - _reset_with_next_probe_row(dst_columns); + mutable_join_block, state->batch_size()); + _reset_with_next_probe_row(); } break; } } if (!_matched_rows_done && _current_build_pos == _build_blocks.size()) { - _reset_with_next_probe_row(dst_columns); + if (_is_mark_join && _build_blocks.empty()) { + DCHECK_EQ(JoinOpType::value, TJoinOp::CROSS_JOIN); + _append_left_data_with_null(mutable_join_block); + _reset_with_next_probe_row(); + break; + } + _reset_with_next_probe_row(); } } if constexpr (!set_probe_side_flag) { @@ -290,7 +295,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo Status status = _do_filtering_and_update_visited_flags<set_build_side_flag, set_probe_side_flag>( &tmp_block, offset_stack, !_is_right_semi_anti); - _update_tuple_is_null_column(&tmp_block); + _update_additional_flags(&tmp_block); mutable_join_block = MutableBlock(std::move(tmp_block)); if (!status.OK()) { return status; @@ -299,10 +304,9 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo if constexpr (set_build_side_flag) { if (_matched_rows_done && _output_null_idx_build_side < _build_blocks.size()) { - auto& cols = mutable_join_block.mutable_columns(); _finalize_current_phase<true, JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN>( - cols, state->batch_size()); + mutable_join_block, state->batch_size()); } } return Status::OK(); @@ -328,8 +332,37 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo return Status::OK(); } -void VNestedLoopJoinNode::_process_left_child_block(MutableColumns& dst_columns, +void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_block) const { + auto& dst_columns = mutable_block.mutable_columns(); + DCHECK(_is_mark_join); + for (size_t i = 0; i < _num_probe_side_columns; ++i) { + const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); + if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { + auto origin_sz = dst_columns[i]->size(); + DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN); + assert_cast<ColumnNullable*>(dst_columns[i].get()) + ->get_nested_column_ptr() + ->insert_many_from(*src_column.column, _left_block_pos, 1); + assert_cast<ColumnNullable*>(dst_columns[i].get()) + ->get_null_map_column() + .get_data() + .resize_fill(origin_sz + 1, 0); + } else { + dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, 1); + } + } + for (size_t i = 0; i < _num_build_side_columns; ++i) { + dst_columns[_num_probe_side_columns + i]->insert_default(); + } + IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>( + *dst_columns[dst_columns.size() - 1]) + .get_data(); + mark_data.resize_fill(mark_data.size() + 1, 0); +} + +void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block, const Block& now_process_build_block) const { + auto& dst_columns = mutable_block.mutable_columns(); const int max_added_rows = now_process_build_block.rows(); for (size_t i = 0; i < _num_probe_side_columns; ++i) { const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); @@ -367,7 +400,7 @@ void VNestedLoopJoinNode::_process_left_child_block(MutableColumns& dst_columns, } } -void VNestedLoopJoinNode::_update_tuple_is_null_column(Block* block) { +void VNestedLoopJoinNode::_update_additional_flags(Block* block) { if (_is_outer_join) { auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); @@ -383,6 +416,15 @@ void VNestedLoopJoinNode::_update_tuple_is_null_column(Block* block) { right_null_map.get_data().resize_fill(block->rows(), 0); } } + if (_is_mark_join) { + IColumn::Filter& mark_data = + assert_cast<doris::vectorized::ColumnVector<UInt8>&>( + *block->get_by_position(block->columns() - 1).column->assume_mutable()) + .get_data(); + if (mark_data.size() < block->rows()) { + mark_data.resize_fill(block->rows(), 1); + } + } } void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) { @@ -396,10 +438,12 @@ void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) { } template <bool BuildSide, bool IsSemi> -void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, size_t batch_size) { +void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, size_t batch_size) { + auto& dst_columns = mutable_block.mutable_columns(); DCHECK_GT(dst_columns.size(), 0); auto pre_size = dst_columns[0]->size(); if constexpr (BuildSide) { + DCHECK(!_is_mark_join); auto build_block_sz = _build_blocks.size(); size_t i = _output_null_idx_build_side; for (; i < build_block_sz; i++) { @@ -470,15 +514,26 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s _output_null_idx_build_side = i; } else { if constexpr (IsSemi) { - if (!_cur_probe_row_visited_flags) { + if (!_cur_probe_row_visited_flags && !_is_mark_join) { return; } } else { - if (_cur_probe_row_visited_flags) { + if (_cur_probe_row_visited_flags && !_is_mark_join) { return; } } + if (_is_mark_join) { + IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>( + *dst_columns[dst_columns.size() - 1]) + .get_data(); + mark_data.resize_fill(mark_data.size() + 1, + (IsSemi && !_cur_probe_row_visited_flags) || + (!IsSemi && _cur_probe_row_visited_flags) + ? 0 + : 1); + } + DCHECK_LT(_left_block_pos, _left_block.rows()); for (size_t i = 0; i < _num_probe_side_columns; ++i) { const ColumnWithTypeAndName src_column = _left_block.get_by_position(i); @@ -511,7 +566,8 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s } } -void VNestedLoopJoinNode::_reset_with_next_probe_row(MutableColumns& dst_columns) { +void VNestedLoopJoinNode::_reset_with_next_probe_row() { + // TODO: need a vector of left block to register the _probe_row_visited_flags _cur_probe_row_visited_flags = false; _current_build_pos = 0; _left_block_pos++; diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 45644f9e8e..aa63728cce 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -67,7 +67,7 @@ private: // Processes a block from the left child. // dst_columns: left_child_row and now_process_build_block to construct a bundle column of new block // now_process_build_block: right child block now to process - void _process_left_child_block(MutableColumns& dst_columns, + void _process_left_child_block(MutableBlock& mutable_block, const Block& now_process_build_block) const; template <bool SetBuildSideFlag, bool SetProbeSideFlag> @@ -75,19 +75,23 @@ private: bool materialize); template <bool BuildSide, bool IsSemi> - void _finalize_current_phase(MutableColumns& dst_columns, size_t batch_size); + void _finalize_current_phase(MutableBlock& mutable_block, size_t batch_size); - void _reset_with_next_probe_row(MutableColumns& dst_columns); + void _reset_with_next_probe_row(); void _release_mem(); Status get_left_side(RuntimeState* state, Block* block); // add tuple is null flag column to Block for filter conjunct and output expr - void _update_tuple_is_null_column(Block* block); + void _update_additional_flags(Block* block); void _add_tuple_is_null_column(Block* block) override; + // For mark join, if the relation from right side is empty, we should construct intermediate + // block with data from left side and filled with null for right side + void _append_left_data_with_null(MutableBlock& mutable_block) const; + // List of build blocks, constructed in prepare() Blocks _build_blocks; // Visited flags for each row in build side. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java index 36895d8111..a9598a11f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java @@ -910,9 +910,8 @@ public class AnalyticExpr extends Expr { } @Override - protected Expr substituteImpl(ExprSubstitutionMap sMap, Analyzer analyzer) - throws AnalysisException { - Expr e = super.substituteImpl(sMap, analyzer); + protected Expr substituteImpl(ExprSubstitutionMap sMap, ExprSubstitutionMap disjunctsMap, Analyzer analyzer) { + Expr e = super.substituteImpl(sMap, disjunctsMap, analyzer); if (!(e instanceof AnalyticExpr)) { return e; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 85289d9044..9c5b2e0be1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -389,6 +389,10 @@ public class Analyzer { private final Map<SlotId, SlotId> equivalentSlots = Maps.newHashMap(); + private final Map<String, TupleDescriptor> markTuples = Maps.newHashMap(); + + private final Map<TableRef, TupleId> markTupleIdByInnerRef = Maps.newHashMap(); + public GlobalState(Env env, ConnectContext context) { this.env = env; this.context = context; @@ -649,6 +653,14 @@ public class Analyzer { tableRefMap.put(result.getId(), ref); + // for mark join + if (ref.getJoinOp() != null && ref.isMark()) { + TupleDescriptor markTuple = getDescTbl().createTupleDescriptor(); + markTuple.setAliases(new String[]{ref.getMarkTupleName()}, true); + globalState.markTuples.put(ref.getMarkTupleName(), markTuple); + globalState.markTupleIdByInnerRef.put(ref, markTuple.getId()); + } + return result; } @@ -865,7 +877,7 @@ public class Analyzer { newTblName == null ? "table list" : newTblName.toString()); } - Column col = d.getTable().getColumn(colName); + Column col = d.getTable() == null ? new Column(colName, ScalarType.BOOLEAN) : d.getTable().getColumn(colName); if (col == null) { ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, colName, newTblName == null ? d.getTable().getName() : newTblName.toString()); @@ -937,7 +949,7 @@ public class Analyzer { } } - return result; + return result != null ? result : globalState.markTuples.get(tblName.toString()); } private TupleDescriptor resolveColumnRef(String colName) throws AnalysisException { @@ -1517,6 +1529,36 @@ public class Analyzer { return result; } + public List<Expr> getMarkConjuncts(TableRef ref) { + TupleId id = globalState.markTupleIdByInnerRef.get(ref); + if (id == null) { + return Collections.emptyList(); + } + return getAllConjuncts(id); + } + + public TupleDescriptor getMarkTuple(TableRef ref) { + TupleDescriptor markTuple = globalState.descTbl.getTupleDesc(globalState.markTupleIdByInnerRef.get(ref)); + if (markTuple != null) { + markTuple.setIsMaterialized(true); + markTuple.getSlots().forEach(s -> s.setIsMaterialized(true)); + } + return markTuple; + } + + public List<Expr> getMarkConjuncts() { + List<Expr> exprs = Lists.newArrayList(); + List<TupleId> markIds = Lists.newArrayList(globalState.markTupleIdByInnerRef.values()); + for (Expr e : globalState.conjuncts.values()) { + List<TupleId> tupleIds = Lists.newArrayList(); + e.getIds(tupleIds, null); + if (!Collections.disjoint(markIds, tupleIds)) { + exprs.add(e); + } + } + return exprs; + } + /** * Get all predicates belonging to one or more tuples that have not yet been assigned * Since these predicates will be assigned by upper-level plan nodes in the future, diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index e92bc720cf..79df8f3395 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -691,12 +691,17 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl */ public Expr trySubstitute(ExprSubstitutionMap smap, Analyzer analyzer, boolean preserveRootType) throws AnalysisException { + return trySubstitute(smap, null, analyzer, preserveRootType); + } + + public Expr trySubstitute(ExprSubstitutionMap smap, ExprSubstitutionMap disjunctsMap, Analyzer analyzer, + boolean preserveRootType) throws AnalysisException { Expr result = clone(); // Return clone to avoid removing casts. if (smap == null) { return result; } - result = result.substituteImpl(smap, analyzer); + result = result.substituteImpl(smap, disjunctsMap, analyzer); result.analyze(analyzer); if (preserveRootType && !type.equals(result.getType())) { result = result.castTo(type); @@ -717,8 +722,14 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl */ public Expr substitute(ExprSubstitutionMap smap, Analyzer analyzer, boolean preserveRootType) throws AnalysisException { + return substitute(smap, null, analyzer, preserveRootType); + } + + public Expr substitute(ExprSubstitutionMap smap, ExprSubstitutionMap disjunctsMap, + Analyzer analyzer, boolean preserveRootType) + throws AnalysisException { try { - return trySubstitute(smap, analyzer, preserveRootType); + return trySubstitute(smap, disjunctsMap, analyzer, preserveRootType); } catch (AnalysisException e) { throw e; } catch (Exception e) { @@ -755,10 +766,9 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl * Exprs that have non-child exprs which should be affected by substitutions must * override this method and apply the substitution to such exprs as well. */ - protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer) - throws AnalysisException { + protected Expr substituteImpl(ExprSubstitutionMap smap, ExprSubstitutionMap disjunctsMap, Analyzer analyzer) { if (isImplicitCast()) { - return getChild(0).substituteImpl(smap, analyzer); + return getChild(0).substituteImpl(smap, disjunctsMap, analyzer); } if (smap != null) { Expr substExpr = smap.get(this); @@ -766,8 +776,12 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl return substExpr.clone(); } } + if (Expr.IS_OR_PREDICATE.apply(this) && disjunctsMap != null) { + smap = disjunctsMap; + disjunctsMap = null; + } for (int i = 0; i < children.size(); ++i) { - children.set(i, children.get(i).substituteImpl(smap, analyzer)); + children.set(i, children.get(i).substituteImpl(smap, disjunctsMap, analyzer)); } // SlotRefs must remain analyzed to support substitution across query blocks. All // other exprs must be analyzed again after the substitution to add implicit casts diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index 937aeb7437..d20626e4c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -377,8 +377,7 @@ public class FunctionCallExpr extends Expr { } @Override - protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer) - throws AnalysisException { + protected Expr substituteImpl(ExprSubstitutionMap smap, ExprSubstitutionMap disjunctsMap, Analyzer analyzer) { if (aggFnParams != null && aggFnParams.exprs() != null) { ArrayList<Expr> newParams = new ArrayList<Expr>(); for (Expr expr : aggFnParams.exprs()) { @@ -392,7 +391,7 @@ public class FunctionCallExpr extends Expr { aggFnParams = aggFnParams .clone(newParams); } - return super.substituteImpl(smap, analyzer); + return super.substituteImpl(smap, disjunctsMap, analyzer); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java index a01739a78e..de33961db1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java @@ -37,12 +37,12 @@ public enum JoinOperator { // NOT IN subqueries. It can have a single equality join conjunct // that returns TRUE when the rhs is NULL. NULL_AWARE_LEFT_ANTI_JOIN("NULL AWARE LEFT ANTI JOIN", - TJoinOp.NULL_AWARE_LEFT_ANTI_JOIN); + TJoinOp.NULL_AWARE_LEFT_ANTI_JOIN); private final String description; private final TJoinOp thriftJoinOp; - private JoinOperator(String description, TJoinOp thriftJoinOp) { + JoinOperator(String description, TJoinOp thriftJoinOp) { this.description = description; this.thriftJoinOp = thriftJoinOp; } @@ -72,11 +72,11 @@ public enum JoinOperator { } public boolean isLeftSemiJoin() { - return this == LEFT_SEMI_JOIN; + return this.thriftJoinOp == TJoinOp.LEFT_SEMI_JOIN; } public boolean isInnerJoin() { - return this == INNER_JOIN; + return this.thriftJoinOp == TJoinOp.INNER_JOIN; } public boolean isAntiJoin() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index 36ea44680a..26a0eb5e8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -711,6 +711,9 @@ public class SelectStmt extends QueryStmt { List<Expr> baseTblJoinConjuncts = Expr.trySubstituteList(unassignedJoinConjuncts, baseTblSmap, analyzer, false); analyzer.materializeSlots(baseTblJoinConjuncts); + List<Expr> markConjuncts = analyzer.getMarkConjuncts(); + markConjuncts = Expr.trySubstituteList(markConjuncts, baseTblSmap, analyzer, false); + analyzer.materializeSlots(markConjuncts); if (evaluateOrderBy) { // mark ordering exprs before marking agg/analytic exprs because they could contain diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java index 92be4011e8..f837bf6af5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java @@ -20,8 +20,10 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -106,11 +108,13 @@ public class StmtRewriter { if (result.hasWhereClause()) { // Push negation to leaf operands. result.whereClause = Expr.pushNegationToOperands(result.whereClause); - // Check if we can equal the subqueries in the WHERE clause. OR predicates with - // subqueries are not supported. - if (hasSubqueryInDisjunction(result.whereClause)) { - throw new AnalysisException("Subqueries in OR predicates are not supported: " - + result.whereClause.toSql()); + if (ConnectContext.get() == null || !ConnectContext.get().getSessionVariable().enableVectorizedEngine()) { + // Check if we can equal the subqueries in the WHERE clause. OR predicates with + // subqueries are not supported. + if (hasSubqueryInDisjunction(result.whereClause)) { + throw new AnalysisException("Subqueries in OR predicates are not supported: " + + result.whereClause.toSql()); + } } rewriteWhereClauseSubqueries(result, analyzer); } @@ -355,6 +359,24 @@ public class StmtRewriter { return false; } + private static void extractExprWithSubquery(boolean inDisjunct, Expr expr, + List<Expr> subqueryExprInConjunct, List<Expr> subqueryExprInDisjunct) { + if (!(expr instanceof CompoundPredicate)) { + if (expr.contains(Subquery.class)) { + if (inDisjunct) { + subqueryExprInDisjunct.add(expr); + } else { + subqueryExprInConjunct.add(expr); + } + } + } else { + for (Expr child : expr.getChildren()) { + extractExprWithSubquery(inDisjunct || Expr.IS_OR_PREDICATE.apply(expr), child, + subqueryExprInConjunct, subqueryExprInDisjunct); + } + } + } + /** * Rewrite all subqueries of a stmt's WHERE clause. Initially, all the * conjuncts containing subqueries are extracted from the WHERE clause and are @@ -408,59 +430,42 @@ public class StmtRewriter { * ON $a$1.a = T1.a * WHERE T1.c < 10; */ - private static void rewriteWhereClauseSubqueries( - SelectStmt stmt, Analyzer analyzer) + // TODO(mark join) need support mark join + private static void rewriteWhereClauseSubqueries(SelectStmt stmt, Analyzer analyzer) throws AnalysisException { int numTableRefs = stmt.fromClause.size(); - ArrayList<Expr> exprsWithSubqueries = Lists.newArrayList(); - ExprSubstitutionMap smap = new ExprSubstitutionMap(); + ArrayList<Expr> exprsWithSubqueriesInConjuncts = Lists.newArrayList(); + ArrayList<Expr> exprsWithSubqueriesInDisjuncts = Lists.newArrayList(); + ExprSubstitutionMap conjunctsSmap = new ExprSubstitutionMap(); + ExprSubstitutionMap disjunctsSmap = new ExprSubstitutionMap(); + List<TupleDescriptor> markTuples = Lists.newArrayList(); + List<Expr> subqueryInConjunct = Lists.newArrayList(); + List<Expr> subqueryInDisjunct = Lists.newArrayList(); // Check if all the conjuncts in the WHERE clause that contain subqueries // can currently be rewritten as a join. - for (Expr conjunct : stmt.whereClause.getConjuncts()) { - List<Subquery> subqueries = Lists.newArrayList(); - conjunct.collectAll(Predicates.instanceOf(Subquery.class), subqueries); - if (subqueries.size() == 0) { - continue; - } - if (subqueries.size() > 1) { - throw new AnalysisException("Multiple subqueries are not supported in " - + "expression: " + conjunct.toSql()); - } - if (!(conjunct instanceof InPredicate) - && !(conjunct instanceof ExistsPredicate) - && !(conjunct instanceof BinaryPredicate) - && !conjunct.contains(Expr.IS_SCALAR_SUBQUERY)) { - throw new AnalysisException("Non-scalar subquery is not supported in " - + "expression: " - + conjunct.toSql()); - } - - if (conjunct instanceof ExistsPredicate) { - // Check if we can determine the result of an ExistsPredicate during analysis. - // If so, replace the predicate with a BoolLiteral predicate and remove it from - // the list of predicates to be rewritten. - BoolLiteral boolLiteral = replaceExistsPredicate((ExistsPredicate) conjunct); - if (boolLiteral != null) { - boolLiteral.analyze(analyzer); - smap.put(conjunct, boolLiteral); - continue; - } - } - - // Replace all the supported exprs with subqueries with true BoolLiterals - // using an smap. - BoolLiteral boolLiteral = new BoolLiteral(true); - boolLiteral.analyze(analyzer); - smap.put(conjunct, boolLiteral); - exprsWithSubqueries.add(conjunct); + // TODO(mark join) traverse expr tree to process subquery. + extractExprWithSubquery(false, stmt.whereClause, subqueryInConjunct, subqueryInDisjunct); + for (Expr conjunct : subqueryInConjunct) { + processOneSubquery(stmt, exprsWithSubqueriesInConjuncts, + conjunctsSmap, markTuples, conjunct, analyzer, false); } - stmt.whereClause = stmt.whereClause.substitute(smap, analyzer, false); + for (Expr conjunct : subqueryInDisjunct) { + processOneSubquery(stmt, exprsWithSubqueriesInDisjuncts, + disjunctsSmap, markTuples, conjunct, analyzer, true); + } + stmt.whereClause = stmt.whereClause.substitute(conjunctsSmap, disjunctsSmap, analyzer, false); boolean hasNewVisibleTuple = false; // Recursively equal all the exprs that contain subqueries and merge them // with 'stmt'. - for (Expr expr : exprsWithSubqueries) { - if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer)) { + for (Expr expr : exprsWithSubqueriesInConjuncts) { + if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer, null)) { + hasNewVisibleTuple = true; + } + } + for (int i = 0; i < exprsWithSubqueriesInDisjuncts.size(); i++) { + Expr expr = exprsWithSubqueriesInDisjuncts.get(i); + if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer, markTuples.get(i))) { hasNewVisibleTuple = true; } } @@ -472,6 +477,65 @@ public class StmtRewriter { } } + private static void processOneSubquery(SelectStmt stmt, + List<Expr> exprsWithSubqueries, ExprSubstitutionMap smap, List<TupleDescriptor> markTuples, + Expr exprWithSubquery, Analyzer analyzer, boolean isMark) throws AnalysisException { + List<Subquery> subqueries = Lists.newArrayList(); + exprWithSubquery.collectAll(Predicates.instanceOf(Subquery.class), subqueries); + if (subqueries.size() == 0) { + return; + } + if (subqueries.size() > 1) { + throw new AnalysisException("Multiple subqueries are not supported in " + + "expression: " + exprWithSubquery.toSql()); + } + if (!(exprWithSubquery instanceof InPredicate) + && !(exprWithSubquery instanceof ExistsPredicate) + && !(exprWithSubquery instanceof BinaryPredicate) + && !exprWithSubquery.contains(Expr.IS_SCALAR_SUBQUERY)) { + throw new AnalysisException("Non-scalar subquery is not supported in " + + "expression: " + + exprWithSubquery.toSql()); + } + + if (exprWithSubquery instanceof ExistsPredicate) { + // Check if we can determine the result of an ExistsPredicate during analysis. + // If so, replace the predicate with a BoolLiteral predicate and remove it from + // the list of predicates to be rewritten. + BoolLiteral boolLiteral = replaceExistsPredicate((ExistsPredicate) exprWithSubquery); + if (boolLiteral != null) { + boolLiteral.analyze(analyzer); + smap.put(exprWithSubquery, boolLiteral); + return; + } + } + + // Replace all the supported exprs with subqueries with true BoolLiterals + // using a smap. + if (isMark) { + // TODO(mark join) if need mark join, we should replace a SlotRef instead of BoolLiteral + TupleDescriptor markTuple = analyzer.getDescTbl().createTupleDescriptor(); + markTuple.setAliases(new String[]{stmt.getTableAliasGenerator().getNextAlias()}, true); + SlotDescriptor markSlot = analyzer.addSlotDescriptor(markTuple); + String slotName = stmt.getColumnAliasGenerator().getNextAlias(); + markSlot.setType(ScalarType.BOOLEAN); + markSlot.setIsMaterialized(true); + markSlot.setIsNullable(false); + markSlot.setColumn(new Column(slotName, ScalarType.BOOLEAN)); + SlotRef markRef = new SlotRef(markSlot); + markRef.setTblName(new TableName(null, null, markTuple.getAlias())); + markRef.setLabel(slotName); + smap.put(exprWithSubquery, markRef); + markTuples.add(markTuple); + exprsWithSubqueries.add(exprWithSubquery); + } else { + BoolLiteral boolLiteral = new BoolLiteral(true); + boolLiteral.analyze(analyzer); + smap.put(exprWithSubquery, boolLiteral); + exprsWithSubqueries.add(exprWithSubquery); + } + } + /** * Replace an ExistsPredicate that contains a subquery with a BoolLiteral if we @@ -549,7 +613,7 @@ public class StmtRewriter { * @throws AnalysisException */ private static boolean mergeExpr(SelectStmt stmt, Expr expr, - Analyzer analyzer) throws AnalysisException { + Analyzer analyzer, TupleDescriptor markTuple) throws AnalysisException { // LOG.warn("dhc mergeExpr stmt={} expr={}", stmt, expr); LOG.debug("SUBQUERY mergeExpr stmt={} expr={}", stmt.toSql(), expr.toSql()); Preconditions.checkNotNull(expr); @@ -564,7 +628,7 @@ public class StmtRewriter { // to eliminate any chance that column aliases from the parent query could reference // select items from the inline view after the equal. List<String> colLabels = Lists.newArrayList(); - // add a new alias for all of columns in subquery + // add a new alias for all columns in subquery for (int i = 0; i < subqueryStmt.getColLabels().size(); ++i) { colLabels.add(subqueryStmt.getColumnAliasGenerator().getNextAlias()); } @@ -598,7 +662,7 @@ public class StmtRewriter { } /* - * Situation: The expr is a uncorrelated subquery for outer stmt. + * Situation: The expr is an uncorrelated subquery for outer stmt. * Rewrite: Add a limit 1 for subquery. * origin stmt: select * from t1 where exists (select * from table2); * expr: exists (select * from table2) @@ -612,7 +676,7 @@ public class StmtRewriter { } // Analyzing the inline view trigger reanalysis of the subquery's select statement. - // However the statement is already analyzed and since statement analysis is not + // However, the statement is already analyzed and since statement analysis is not // idempotent, the analysis needs to be reset (by a call to clone()). // inlineView = (InlineViewRef) inlineView.clone(); inlineView.reset(); @@ -627,8 +691,7 @@ public class StmtRewriter { JoinOperator joinOp = JoinOperator.LEFT_SEMI_JOIN; // Create a join conjunct from the expr that contains a subquery. - Expr joinConjunct = createJoinConjunct(expr, inlineView, analyzer, - !onClauseConjuncts.isEmpty()); + Expr joinConjunct = createJoinConjunct(expr, inlineView, analyzer, !onClauseConjuncts.isEmpty()); if (joinConjunct != null) { SelectListItem firstItem = ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0); @@ -641,8 +704,16 @@ public class StmtRewriter { // by the subquery due to some predicate. The new join conjunct is added to // stmt's WHERE clause because it needs to be applied to the result of the // LEFT OUTER JOIN (both matched and unmatched tuples). - stmt.whereClause = - CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause); + if (markTuple != null) { + // replace + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + smap.put(new SlotRef(markTuple.getSlots().get(0)), joinConjunct); + stmt.whereClause.substitute(smap); + markTuple = null; + } else { + stmt.whereClause = + CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause); + } joinConjunct = null; joinOp = JoinOperator.LEFT_OUTER_JOIN; updateSelectList = true; @@ -666,7 +737,9 @@ public class StmtRewriter { // subquery using a CROSS JOIN. // TODO This is very expensive. Remove it when we implement independent // subquery evaluation. - inlineView.setJoinOp(JoinOperator.CROSS_JOIN); + joinOp = JoinOperator.CROSS_JOIN; + inlineView.setMark(markTuple); + inlineView.setJoinOp(joinOp); LOG.warn("uncorrelated subquery rewritten using a cross join"); // Indicate that new visible tuples may be added in stmt's select list. return true; @@ -732,10 +805,19 @@ public class StmtRewriter { joinOp = JoinOperator.CROSS_JOIN; // We can equal the aggregate subquery using a cross join. All conjuncts // that were extracted from the subquery are added to stmt's WHERE clause. - stmt.whereClause = - CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause); + if (markTuple != null) { + // replace + ExprSubstitutionMap markSmap = new ExprSubstitutionMap(); + markSmap.put(new SlotRef(markTuple.getSlots().get(0)), onClausePredicate); + stmt.whereClause.substitute(markSmap); + markTuple = null; + } else { + stmt.whereClause = + CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause); + } } + inlineView.setMark(markTuple); inlineView.setJoinOp(joinOp); if (joinOp != JoinOperator.CROSS_JOIN) { inlineView.setOnClause(onClausePredicate); @@ -752,7 +834,10 @@ public class StmtRewriter { && ((ExistsPredicate) expr).isNotExists()) { // For the case of a NOT IN with an eq join conjunct, replace the join // conjunct with a conjunct that uses the null-matching eq operator. - if (expr instanceof InPredicate) { + // TODO: mark join only works on nested loop join now, and NLJ do NOT support NULL_AWARE_LEFT_ANTI_JOIN + // remove markTuple == null when nested loop join support NULL_AWARE_LEFT_ANTI_JOIN + // or plan mark join on hash join + if (expr instanceof InPredicate && markTuple == null) { joinOp = VectorizedUtil.isVectorized() ? JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN : JoinOperator.LEFT_ANTI_JOIN; List<TupleId> tIds = Lists.newArrayList(); @@ -777,6 +862,7 @@ public class StmtRewriter { } } + inlineView.setMark(markTuple); inlineView.setJoinOp(joinOp); inlineView.setOnClause(onClausePredicate); return updateSelectList; @@ -850,8 +936,7 @@ public class StmtRewriter { ArrayList<Expr> correlatedPredicates = Lists.newArrayList(); if (subqueryStmt.hasWhereClause()) { - if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(), - subqueryTupleIds)) { + if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(), subqueryTupleIds)) { throw new AnalysisException("Disjunctions with correlated predicates " + "are not supported: " + subqueryStmt.getWhereClause().toSql()); } @@ -898,15 +983,13 @@ public class StmtRewriter { * replace them with true BoolLiterals. The modified expr tree is returned * and the extracted correlated predicates are added to 'matches'. */ - private static Expr extractCorrelatedPredicates(Expr root, List<TupleId> tupleIds, - ArrayList<Expr> matches) { + private static Expr extractCorrelatedPredicates(Expr root, List<TupleId> tupleIds, ArrayList<Expr> matches) { if (isCorrelatedPredicate(root, tupleIds)) { matches.add(root); return new BoolLiteral(true); } for (int i = 0; i < root.getChildren().size(); ++i) { - root.getChildren().set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds, - matches)); + root.getChildren().set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds, matches)); } return root; } @@ -933,7 +1016,7 @@ public class StmtRewriter { /** * Checks if an expr containing a correlated subquery is eligible for equal by - * tranforming into a join. 'correlatedPredicates' contains the correlated + * transforming into a join. 'correlatedPredicates' contains the correlated * predicates identified in the subquery. Throws an AnalysisException if 'expr' * is not eligible for equal. * TODO: Merge all the equal eligibility tests into a single function. @@ -953,26 +1036,19 @@ public class StmtRewriter { SelectListItem item = stmt.getSelectList().getItems().get(0); if (!item.getExpr().contains(Expr.CORRELATED_SUBQUERY_SUPPORT_AGG_FN)) { throw new AnalysisException("The select item in correlated subquery of binary predicate should only " - + "be sum, min, max, avg and count. Current subquery:" - + stmt.toSql()); + + "be sum, min, max, avg and count. Current subquery:" + stmt.toSql()); } } // Grouping and/or aggregation (including analytic functions) is forbidden in correlated subquery of in // predicate. if (expr instanceof InPredicate && (stmt.hasAggInfo() || stmt.hasAnalyticInfo())) { LOG.warn("canRewriteCorrelatedSubquery fail, expr={} subquery={}", expr.toSql(), stmt.toSql()); - throw new AnalysisException("Unsupported correlated subquery with grouping " - + "and/or aggregation: " - + stmt.toSql()); + throw new AnalysisException("Unsupported correlated subquery" + + " with grouping and/or aggregation: " + stmt.toSql()); } final com.google.common.base.Predicate<Expr> isSingleSlotRef = - new com.google.common.base.Predicate<Expr>() { - @Override - public boolean apply(Expr arg) { - return arg.unwrapSlotRef(false) != null; - } - }; + arg -> arg.unwrapSlotRef(false) != null; // A HAVING clause is only allowed on correlated EXISTS subqueries with // correlated binary predicates of the form Slot = Slot (see IMPALA-2734) @@ -1099,7 +1175,7 @@ public class StmtRewriter { * the aggregate function is wrapped into a 'zeroifnull' function. */ private static Expr createJoinConjunct(Expr exprWithSubquery, InlineViewRef inlineView, - Analyzer analyzer, boolean isCorrelated) throws AnalysisException { + Analyzer analyzer, boolean isCorrelated) throws AnalysisException { Preconditions.checkNotNull(exprWithSubquery); Preconditions.checkNotNull(inlineView); Preconditions.checkState(exprWithSubquery.contains(Subquery.class)); @@ -1134,8 +1210,8 @@ public class StmtRewriter { ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0); if (isCorrelated && item.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) { // TODO: Add support for multiple agg functions that return non-null on an - // empty input, by wrapping them with zeroifnull functions before the inline - // view is analyzed. + // empty input, by wrapping them with zeroifnull functions before the inline + // view is analyzed. if (!Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr()) && (!(item.getExpr() instanceof CastExpr) || !Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr().getChild(0)))) { @@ -1149,12 +1225,12 @@ public class StmtRewriter { // TODO Generalize this by making the aggregate functions aware of the // literal expr that they return on empty input, e.g. max returns a // NullLiteral whereas count returns a NumericLiteral. - if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isNumericType()) { + if (aggFns.get(0).getFn().getReturnType().isNumericType()) { FunctionCallExpr zeroIfNull = new FunctionCallExpr("ifnull", Lists.newArrayList((Expr) slotRef, new IntLiteral(0, Type.BIGINT))); zeroIfNull.analyze(analyzer); subquerySubstitute = zeroIfNull; - } else if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isStringType()) { + } else if (aggFns.get(0).getFn().getReturnType().isStringType()) { List<Expr> params = Lists.newArrayList(); params.add(slotRef); params.add(new StringLiteral("")); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 5e6dafc6ce..d960587b13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -91,6 +91,8 @@ public class TableRef implements ParseNode, Writable { // Indicates whether this table ref is given an explicit alias, protected boolean hasExplicitAlias; protected JoinOperator joinOp; + protected boolean isMark; + protected String markTupleName; protected List<String> usingColNames; protected ArrayList<LateralViewRef> lateralViewRefs; protected Expr onClause; @@ -175,6 +177,8 @@ public class TableRef implements ParseNode, Writable { aliases = other.aliases; hasExplicitAlias = other.hasExplicitAlias; joinOp = other.joinOp; + isMark = other.isMark; + markTupleName = other.markTupleName; // NOTE: joinHints and sortHints maybe changed after clone. so we new one List. joinHints = (other.joinHints != null) ? Lists.newArrayList(other.joinHints) : null; @@ -261,6 +265,23 @@ public class TableRef implements ParseNode, Writable { this.joinOp = op; } + public boolean isMark() { + return isMark; + } + + public String getMarkTupleName() { + return markTupleName; + } + + public void setMark(TupleDescriptor markTuple) { + this.isMark = markTuple != null; + if (isMark) { + this.markTupleName = markTuple.getAlias(); + } else { + this.markTupleName = null; + } + } + public Expr getOnClause() { return onClause; } @@ -651,7 +672,7 @@ public class TableRef implements ParseNode, Writable { case NULL_AWARE_LEFT_ANTI_JOIN: return "NULL AWARE LEFT ANTI JOIN"; default: - return "bad join op: " + joinOp.toString(); + return "bad join op: " + joinOp; } } @@ -792,6 +813,8 @@ public class TableRef implements ParseNode, Writable { */ protected void setJoinAttrs(TableRef other) { this.joinOp = other.joinOp; + this.isMark = other.isMark; + this.markTupleName = other.markTupleName; this.joinHints = other.joinHints; // this.tableHints_ = other.tableHints_; this.onClause = other.onClause; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java index d1e94c0f9f..963a4feda0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java @@ -110,6 +110,7 @@ public abstract class JoinNodeBase extends PlanNode { } protected void computeOutputTuple(Analyzer analyzer) throws UserException { + // TODO(mark join) if it is mark join use mark tuple instead? // 1. create new tuple vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); boolean copyLeft = false; @@ -338,13 +339,16 @@ public abstract class JoinNodeBase extends PlanNode { protected abstract void computeOtherConjuncts(Analyzer analyzer, ExprSubstitutionMap originToIntermediateSmap); - protected void computeIntermediateTuple(Analyzer analyzer) throws AnalysisException { + protected void computeIntermediateTuple(Analyzer analyzer, TupleDescriptor markTuple) throws AnalysisException { // 1. create new tuple TupleDescriptor vIntermediateLeftTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); TupleDescriptor vIntermediateRightTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); vIntermediateTupleDescList = new ArrayList<>(); vIntermediateTupleDescList.add(vIntermediateLeftTupleDesc); vIntermediateTupleDescList.add(vIntermediateRightTupleDesc); + if (markTuple != null) { + vIntermediateTupleDescList.add(markTuple); + } boolean leftNullable = false; boolean rightNullable = false; @@ -446,7 +450,11 @@ public abstract class JoinNodeBase extends PlanNode { public void finalize(Analyzer analyzer) throws UserException { super.finalize(analyzer); if (VectorizedUtil.isVectorized()) { - computeIntermediateTuple(analyzer); + TupleDescriptor markTuple = null; + if (innerRef != null) { + markTuple = analyzer.getMarkTuple(innerRef); + } + computeIntermediateTuple(analyzer, markTuple); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 7a34084ec8..6adb64cd4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -176,6 +176,7 @@ public class NestedLoopJoinNode extends JoinNodeBase { if (vJoinConjunct != null) { msg.nested_loop_join_node.setVjoinConjunct(vJoinConjunct.treeToThrift()); } + msg.nested_loop_join_node.setIsMark(innerRef != null && innerRef.isMark()); if (vSrcToOutputSMap != null) { for (int i = 0; i < vSrcToOutputSMap.size(); i++) { // TODO: Enable it after we support new optimizers diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 5e1973aaed..fd24403179 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -931,8 +931,7 @@ public class SingleNodePlanner { * subplan ref are materialized by a join node added during plan generation. */ // (ML): change the function name - private PlanNode createJoinPlan(Analyzer analyzer, - TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans) + private PlanNode createJoinPlan(Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans) throws UserException { LOG.debug("Try to create a query plan starting with " + leftmostRef.getUniqueAlias()); @@ -2068,18 +2067,21 @@ public class SingleNodePlanner { ojConjuncts = analyzer.getUnassignedConjuncts(tupleIds, false); } analyzer.markConjunctsAssigned(ojConjuncts); - if (eqJoinConjuncts.isEmpty()) { + if (eqJoinConjuncts.isEmpty() || innerRef.isMark()) { NestedLoopJoinNode result = new NestedLoopJoinNode(ctx.getNextNodeId(), outer, inner, innerRef); - result.setJoinConjuncts(ojConjuncts); + List<Expr> joinConjuncts = Lists.newArrayList(eqJoinConjuncts); + joinConjuncts.addAll(ojConjuncts); + result.setJoinConjuncts(joinConjuncts); + result.addConjuncts(analyzer.getMarkConjuncts(innerRef)); result.init(analyzer); return result; } - HashJoinNode result = - new HashJoinNode(ctx.getNextNodeId(), outer, inner, innerRef, eqJoinConjuncts, - ojConjuncts); + HashJoinNode result = new HashJoinNode(ctx.getNextNodeId(), outer, inner, + innerRef, eqJoinConjuncts, ojConjuncts); result.init(analyzer); + result.addConjuncts(analyzer.getMarkConjuncts(innerRef)); return result; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 7cadebd8f6..b14610587a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -623,6 +623,8 @@ struct TNestedLoopJoinNode { 5: optional bool is_output_left_side_only 6: optional Exprs.TExpr vjoin_conjunct + + 7: optional bool is_mark } struct TMergeJoinNode { diff --git a/regression-test/data/correctness/test_subquery_in_disjunction.out b/regression-test/data/correctness/test_subquery_in_disjunction.out new file mode 100644 index 0000000000..4259f027ea --- /dev/null +++ b/regression-test/data/correctness/test_subquery_in_disjunction.out @@ -0,0 +1,26 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !in -- +1 2 3 +10 20 30 + +-- !scalar -- +1 2 3 +100 200 300 + +-- !exists_true -- +1 2 3 +10 20 30 +100 200 300 + +-- !in_exists_false -- +1 2 3 + +-- !not_in -- +1 2 3 +10 20 30 +100 200 300 + +-- !not_in_covered -- +1 2 3 +100 200 300 + diff --git a/regression-test/suites/correctness/test_subquery_in_disjunction.groovy b/regression-test/suites/correctness/test_subquery_in_disjunction.groovy new file mode 100644 index 0000000000..70b280addd --- /dev/null +++ b/regression-test/suites/correctness/test_subquery_in_disjunction.groovy @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_subquery_in_disjunction") { + sql """ DROP TABLE IF EXISTS test_sq_dj1 """ + sql """ DROP TABLE IF EXISTS test_sq_dj2 """ + sql """ + CREATE TABLE `test_sq_dj1` ( + `c1` int(11) NULL, + `c2` int(11) NULL, + `c3` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`c1`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); + """ + sql """ + CREATE TABLE `test_sq_dj2` ( + `c1` int(11) NULL, + `c2` int(11) NULL, + `c3` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`c1`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); + """ + sql """ + insert into test_sq_dj1 values(1, 2, 3), (10, 20, 30), (100, 200, 300) + """ + sql """ + insert into test_sq_dj2 values(10, 20, 30) + """ + + order_qt_in """ + SELECT * FROM test_sq_dj1 WHERE c1 IN (SELECT c1 FROM test_sq_dj2) OR c1 < 10; + """ + + order_qt_scalar """ + SELECT * FROM test_sq_dj1 WHERE c1 > (SELECT AVG(c1) FROM test_sq_dj2) OR c1 < 10; + """ + + order_qt_exists_true """ + SELECT * FROM test_sq_dj1 WHERE EXISTS (SELECT c1 FROM test_sq_dj2 WHERE c1 = 10) OR c1 < 10; + """ + + order_qt_in_exists_false """ + SELECT * FROM test_sq_dj1 WHERE EXISTS (SELECT c1 FROM test_sq_dj2 WHERE c1 > 10) OR c1 < 10; + """ + + order_qt_not_in """ + SELECT * FROM test_sq_dj1 WHERE c1 NOT IN (SELECT c1 FROM test_sq_dj2) OR c1 = 10; + """ + + order_qt_not_in_covered """ + SELECT * FROM test_sq_dj1 WHERE c1 NOT IN (SELECT c1 FROM test_sq_dj2) OR c1 = 100; + """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org