This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new eebfbd0c91 Revert "[fix](vectorized) Support outer join for vectorized exec engine (#10323)" (#10424) eebfbd0c91 is described below commit eebfbd0c91588a9448612a3df93f5aeab85a9458 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Sat Jun 25 22:18:08 2022 +0800 Revert "[fix](vectorized) Support outer join for vectorized exec engine (#10323)" (#10424) This reverts commit 2cc670dba697a330358ae7d485d856e4b457c679. --- be/src/exec/exec_node.cpp | 4 +- be/src/exec/exec_node.h | 2 +- be/src/vec/columns/column_nullable.cpp | 7 - be/src/vec/columns/column_nullable.h | 1 - be/src/vec/core/block.cpp | 4 +- be/src/vec/exec/join/vhash_join_node.cpp | 213 ++++------------ be/src/vec/exec/join/vhash_join_node.h | 36 +-- .../java/org/apache/doris/analysis/Analyzer.java | 65 +++++ .../org/apache/doris/analysis/DescriptorTable.java | 17 -- .../apache/doris/analysis/ExprSubstitutionMap.java | 79 +----- .../java/org/apache/doris/analysis/SelectStmt.java | 8 + .../java/org/apache/doris/analysis/TableRef.java | 5 + ...ectorizedUtil.java => VecNotImplException.java} | 20 +- .../apache/doris/common/util/VectorizedUtil.java | 35 +++ .../org/apache/doris/planner/AggregationNode.java | 16 +- .../org/apache/doris/planner/HashJoinNode.java | 277 ++------------------- .../org/apache/doris/planner/OlapScanNode.java | 1 + .../java/org/apache/doris/planner/PlanNode.java | 15 +- .../org/apache/doris/planner/ProjectPlanner.java | 3 +- .../org/apache/doris/planner/SetOperationNode.java | 1 - .../apache/doris/planner/SingleNodePlanner.java | 18 +- .../java/org/apache/doris/planner/SortNode.java | 2 +- .../java/org/apache/doris/qe/StmtExecutor.java | 9 + .../org/apache/doris/analysis/QueryStmtTest.java | 8 + .../doris/planner/ProjectPlannerFunctionTest.java | 4 +- .../org/apache/doris/planner/QueryPlanTest.java | 35 ++- gensrc/thrift/PlanNodes.thrift | 4 - 27 files changed, 254 insertions(+), 635 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index d0de6aa326..4030b552ac 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -214,9 +214,9 @@ Status ExecNode::prepare(RuntimeState* state) { _mem_tracker); if (_vconjunct_ctx_ptr) { - RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, _row_descriptor, expr_mem_tracker())); + RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, row_desc(), expr_mem_tracker())); } - RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor, expr_mem_tracker())); + RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(), expr_mem_tracker())); // TODO(zc): // AddExprCtxsToFree(_conjunct_ctxs); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index e0a1428952..35e495d158 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -182,7 +182,7 @@ public: int id() const { return _id; } TPlanNodeType::type type() const { return _type; } - virtual const RowDescriptor& row_desc() const { return _row_descriptor; } + const RowDescriptor& row_desc() const { return _row_descriptor; } int64_t rows_returned() const { return _num_rows_returned; } int64_t limit() const { return _limit; } bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index acb8e7787c..215ced2383 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -464,11 +464,4 @@ ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable) { return ColumnNullable::create(column, ColumnUInt8::create(column->size(), is_nullable ? 1 : 0)); } -ColumnPtr remove_nullable(const ColumnPtr& column) { - if (is_column_nullable(*column)) { - return reinterpret_cast<const ColumnNullable*>(column.get())->get_nested_column_ptr(); - } - return column; -} - } // namespace doris::vectorized diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 3b9465e9b4..e6f4495c30 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -289,6 +289,5 @@ private: }; ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable = false); -ColumnPtr remove_nullable(const ColumnPtr& column); } // namespace doris::vectorized diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index c95a97e6c9..3e50c1578b 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -601,7 +601,7 @@ void filter_block_internal(Block* block, const IColumn::Filter& filter, uint32_t auto count = count_bytes_in_filter(filter); if (count == 0) { for (size_t i = 0; i < column_to_keep; ++i) { - std::move(*block->get_by_position(i).column).assume_mutable()->clear(); + std::move(*block->get_by_position(i).column).mutate()->clear(); } } else { if (count != block->rows()) { @@ -651,7 +651,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee bool ret = const_column->get_bool(0); if (!ret) { for (size_t i = 0; i < column_to_keep; ++i) { - std::move(*block->get_by_position(i).column).assume_mutable()->clear(); + std::move(*block->get_by_position(i).column).mutate()->clear(); } } } else { diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 7aba12bee1..4471034ea4 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -489,7 +489,7 @@ struct ProcessHashTableProbe { typeid_cast<ColumnNullable*>( std::move(*output_block->get_by_position(j + right_col_idx) .column) - .assume_mutable() + .mutate() .get()) ->get_null_map_data()[i] = true; } @@ -587,9 +587,7 @@ struct ProcessHashTableProbe { auto& mcol = mutable_block.mutable_columns(); int right_col_idx = - (_join_node->_is_right_semi_anti && !_join_node->_have_other_join_conjunct) - ? 0 - : _join_node->_left_table_data_types.size(); + _join_node->_is_right_semi_anti ? 0 : _join_node->_left_table_data_types.size(); int right_col_len = _join_node->_right_table_data_types.size(); auto& iter = hash_table_ctx.iter; @@ -626,8 +624,7 @@ struct ProcessHashTableProbe { } *eos = iter == hash_table_ctx.hash_table.end(); - output_block->swap( - mutable_block.to_block(_join_node->_is_right_semi_anti ? right_col_idx : 0)); + output_block->swap(mutable_block.to_block()); return Status::OK(); } @@ -667,8 +664,7 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _is_outer_join(_match_all_build || _match_all_probe), _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? tnode.hash_join_node.hash_output_slot_ids - : std::vector<SlotId> {}), - _output_row_desc(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}) { + : std::vector<SlotId> {}) { _runtime_filter_descs = tnode.runtime_filters; init_join_op(); @@ -692,8 +688,8 @@ void HashJoinNode::init_join_op() { //do nothing break; } + return; } - Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); @@ -709,15 +705,15 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; const std::vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts; - for (const auto& eq_join_conjunct : eq_join_conjuncts) { + for (int i = 0; i < eq_join_conjuncts.size(); ++i) { VExprContext* ctx = nullptr; - RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.left, &ctx)); + RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjuncts[i].left, &ctx)); _probe_expr_ctxs.push_back(ctx); - RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.right, &ctx)); + RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjuncts[i].right, &ctx)); _build_expr_ctxs.push_back(ctx); - bool null_aware = eq_join_conjunct.__isset.opcode && - eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL; + bool null_aware = eq_join_conjuncts[i].__isset.opcode && + eq_join_conjuncts[i].opcode == TExprOpcode::EQ_FOR_NULL; _is_null_safe_eq_join.push_back(null_aware); // if is null aware, build join column and probe join column both need dispose null value @@ -741,13 +737,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _have_other_join_conjunct = true; } - const auto& output_exprs = tnode.hash_join_node.srcExprList; - for (const auto& expr : output_exprs) { - VExprContext* ctx = nullptr; - RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, expr, &ctx)); - _output_expr_ctxs.push_back(ctx); - } - for (const auto& filter_desc : _runtime_filter_descs) { RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( RuntimeFilterRole::PRODUCER, filter_desc, state->query_options())); @@ -814,16 +803,12 @@ Status HashJoinNode::prepare(RuntimeState* state) { (*_vother_join_conjunct_ptr) ->prepare(state, _row_desc_for_other_join_conjunt, expr_mem_tracker())); } - - RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_descriptor, expr_mem_tracker())); - // right table data types _right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc()); _left_table_data_types = VectorizedUtils::get_data_types(child(0)->row_desc()); // Hash Table Init _hash_table_init(); - _construct_mutable_join_block(); _build_block_offsets.resize(state->batch_size()); _build_block_rows.resize(state->batch_size()); @@ -838,7 +823,6 @@ Status HashJoinNode::close(RuntimeState* state) { VExpr::close(_build_expr_ctxs, state); VExpr::close(_probe_expr_ctxs, state); if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state); - VExpr::close(_output_expr_ctxs, state); _hash_table_mem_tracker->release(_mem_used); @@ -856,7 +840,17 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo size_t probe_rows = _probe_block.rows(); if ((probe_rows == 0 || _probe_index == probe_rows) && !_probe_eos) { _probe_index = 0; - _prepare_probe_block(); + // clear_column_data of _probe_block + { + if (!_probe_column_disguise_null.empty()) { + for (int i = 0; i < _probe_column_disguise_null.size(); ++i) { + auto column_to_erase = _probe_column_disguise_null[i]; + _probe_block.erase(column_to_erase - i); + } + _probe_column_disguise_null.clear(); + } + release_block_memory(_probe_block); + } do { SCOPED_TIMER(_probe_next_timer); @@ -866,9 +860,6 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo probe_rows = _probe_block.rows(); if (probe_rows != 0) { COUNTER_UPDATE(_probe_rows_counter, probe_rows); - if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { - _probe_column_convert_to_null = _convert_block_to_null(_probe_block); - } int probe_expr_ctxs_sz = _probe_expr_ctxs.size(); _probe_columns.resize(probe_expr_ctxs_sz); @@ -882,9 +873,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { auto& null_map_val = _null_map_column->get_data(); - return _extract_probe_join_column(_probe_block, null_map_val, - _probe_columns, _probe_ignore_null, - *_probe_expr_call_timer); + return extract_probe_join_column(_probe_block, null_map_val, + _probe_columns, _probe_ignore_null, + *_probe_expr_call_timer); } else { LOG(FATAL) << "FATAL: uninited hash table"; } @@ -897,9 +888,6 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo } Status st; - _join_block.clear_column_data(); - MutableBlock mutable_join_block(&_join_block); - Block temp_block; if (_probe_index < _probe_block.rows()) { std::visit( @@ -908,22 +896,33 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo using HashTableCtxType = std::decay_t<decltype(arg)>; using JoinOpType = std::decay_t<decltype(join_op_variants)>; if constexpr (have_other_join_conjunct) { + MutableBlock mutable_block( + VectorizedUtils::create_empty_columnswithtypename( + _row_desc_for_other_join_conjunt)); + if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { ProcessHashTableProbe<HashTableCtxType, JoinOpType, probe_ignore_null> process_hashtable_ctx(this, state->batch_size(), probe_rows); st = process_hashtable_ctx.do_process_with_other_join_conjunts( - arg, &_null_map_column->get_data(), mutable_join_block, - &temp_block); + arg, &_null_map_column->get_data(), mutable_block, + output_block); } else { LOG(FATAL) << "FATAL: uninited hash table"; } } else { + MutableBlock mutable_block = + output_block->mem_reuse() + ? MutableBlock(output_block) + : MutableBlock( + VectorizedUtils::create_empty_columnswithtypename( + row_desc())); + if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { ProcessHashTableProbe<HashTableCtxType, JoinOpType, probe_ignore_null> process_hashtable_ctx(this, state->batch_size(), probe_rows); st = process_hashtable_ctx.do_process(arg, &_null_map_column->get_data(), - mutable_join_block, &temp_block); + mutable_block, output_block); } else { LOG(FATAL) << "FATAL: uninited hash table"; } @@ -934,6 +933,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo make_bool_variant(_probe_ignore_null)); } else if (_probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { + MutableBlock mutable_block( + VectorizedUtils::create_empty_columnswithtypename(row_desc())); std::visit( [&](auto&& arg, auto&& join_op_variants) { using JoinOpType = std::decay_t<decltype(join_op_variants)>; @@ -941,8 +942,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { ProcessHashTableProbe<HashTableCtxType, JoinOpType, false> process_hashtable_ctx(this, state->batch_size(), probe_rows); - st = process_hashtable_ctx.process_data_in_hashtable( - arg, mutable_join_block, &temp_block, eos); + st = process_hashtable_ctx.process_data_in_hashtable(arg, mutable_block, + output_block, eos); } else { LOG(FATAL) << "FATAL: uninited hash table"; } @@ -957,74 +958,12 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo } RETURN_IF_ERROR( - VExprContext::filter_block(_vconjunct_ctx_ptr, &temp_block, temp_block.columns())); - RETURN_IF_ERROR(_build_output_block(&temp_block, output_block)); + VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); reached_limit(output_block, eos); return st; } -void HashJoinNode::_prepare_probe_block() { - // clear_column_data of _probe_block - if (!_probe_column_disguise_null.empty()) { - for (int i = 0; i < _probe_column_disguise_null.size(); ++i) { - auto column_to_erase = _probe_column_disguise_null[i]; - _probe_block.erase(column_to_erase - i); - } - _probe_column_disguise_null.clear(); - } - - // remove add nullmap of probe columns - for (auto index : _probe_column_convert_to_null) { - auto& column_type = _probe_block.safe_get_by_position(index); - DCHECK(column_type.column->is_nullable()); - DCHECK(column_type.type->is_nullable()); - - column_type.column = remove_nullable(column_type.column); - column_type.type = remove_nullable(column_type.type); - } - release_block_memory(_probe_block); -} - -void HashJoinNode::_construct_mutable_join_block() { - const auto& mutable_block_desc = - _have_other_join_conjunct ? _row_desc_for_other_join_conjunt : _row_descriptor; - - // TODO: Support Intermediate tuple in FE to delete the dispose the convert null operation - // here - auto [start_convert_null, end_convert_null] = std::pair {0, 0}; - - switch (_join_op) { - case TJoinOp::LEFT_OUTER_JOIN: { - start_convert_null = child(0)->row_desc().num_materialized_slots(); - end_convert_null = child(0)->row_desc().num_materialized_slots() + - child(1)->row_desc().num_materialized_slots(); - break; - } - case TJoinOp::RIGHT_OUTER_JOIN: { - end_convert_null = child(0)->row_desc().num_materialized_slots(); - break; - } - case TJoinOp::FULL_OUTER_JOIN: { - end_convert_null = child(0)->row_desc().num_materialized_slots() + - child(1)->row_desc().num_materialized_slots(); - break; - } - default: - break; - } - - for (const auto tuple_desc : mutable_block_desc.tuple_descriptors()) { - for (const auto slot_desc : tuple_desc->slots()) { - auto offset = _join_block.columns(); - auto type_ptr = (offset >= start_convert_null && offset < end_convert_null) - ? make_nullable(slot_desc->get_data_type_ptr()) - : slot_desc->get_data_type_ptr(); - _join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()}); - } - } -} - Status HashJoinNode::open(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); @@ -1112,9 +1051,9 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { } // TODO:: unify the code of extract probe join column -Status HashJoinNode::_extract_build_join_column(Block& block, NullMap& null_map, - ColumnRawPtrs& raw_ptrs, bool& ignore_null, - RuntimeProfile::Counter& expr_call_timer) { +Status HashJoinNode::extract_build_join_column(Block& block, NullMap& null_map, + ColumnRawPtrs& raw_ptrs, bool& ignore_null, + RuntimeProfile::Counter& expr_call_timer) { for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) { int result_col_id = -1; // execute build column @@ -1150,9 +1089,9 @@ Status HashJoinNode::_extract_build_join_column(Block& block, NullMap& null_map, return Status::OK(); } -Status HashJoinNode::_extract_probe_join_column(Block& block, NullMap& null_map, - ColumnRawPtrs& raw_ptrs, bool& ignore_null, - RuntimeProfile::Counter& expr_call_timer) { +Status HashJoinNode::extract_probe_join_column(Block& block, NullMap& null_map, + ColumnRawPtrs& raw_ptrs, bool& ignore_null, + RuntimeProfile::Counter& expr_call_timer) { for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { int result_col_id = -1; // execute build column @@ -1204,9 +1143,6 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin } COUNTER_UPDATE(_build_rows_counter, rows); - if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { - _convert_block_to_null(block); - } ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size()); NullMap null_map_val(rows); @@ -1218,8 +1154,8 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin [&](auto&& arg) -> Status { using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - return _extract_build_join_column(block, null_map_val, raw_ptrs, has_null, - *_build_expr_call_timer); + return extract_build_join_column(block, null_map_val, raw_ptrs, has_null, + *_build_expr_call_timer); } else { LOG(FATAL) << "FATAL: uninited hash table"; } @@ -1337,51 +1273,4 @@ void HashJoinNode::_hash_table_init() { _hash_table_variants.emplace<SerializedHashTableContext>(); } } - -std::vector<uint16_t> HashJoinNode::_convert_block_to_null(Block& block) { - std::vector<uint16_t> results; - for (int i = 0; i < block.columns(); ++i) { - if (auto& column_type = block.safe_get_by_position(i); !column_type.type->is_nullable()) { - DCHECK(!column_type.column->is_nullable()); - column_type.column = make_nullable(column_type.column); - column_type.type = make_nullable(column_type.type); - results.emplace_back(i); - } - } - return results; -} - -Status HashJoinNode::_build_output_block(Block* origin_block, Block* output_block) { - auto is_mem_reuse = output_block->mem_reuse(); - MutableBlock mutable_block = - is_mem_reuse ? MutableBlock(output_block) - : MutableBlock(VectorizedUtils::create_empty_columnswithtypename( - _output_row_desc)); - auto rows = origin_block->rows(); - if (rows != 0) { - auto& mutable_columns = mutable_block.mutable_columns(); - if (_output_expr_ctxs.empty()) { - DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots()); - for (int i = 0; i < mutable_columns.size(); ++i) { - mutable_columns[i]->insert_range_from(*origin_block->get_by_position(i).column, 0, - rows); - } - } else { - DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots()); - for (int i = 0; i < mutable_columns.size(); ++i) { - auto result_column_id = -1; - RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id)); - auto column_ptr = origin_block->get_by_position(result_column_id) - .column->convert_to_full_column_if_const(); - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } - } - - if (!is_mem_reuse) output_block->swap(mutable_block.to_block()); - DCHECK(output_block->rows() == rows); - } - - return Status::OK(); -} - } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 0c8c658d58..7db2db48d3 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -147,17 +147,15 @@ public: HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~HashJoinNode() override; - Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; - Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; - Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; - Status get_next(RuntimeState* state, Block* block, bool* eos) override; - Status close(RuntimeState* state) override; + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + virtual Status prepare(RuntimeState* state) override; + virtual Status open(RuntimeState* state) override; + virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override; + virtual Status close(RuntimeState* state) override; HashTableVariants& get_hash_table_variants() { return _hash_table_variants; } void init_join_op(); - const RowDescriptor& row_desc() const override { return _output_row_desc; } - private: using VExprContexts = std::vector<VExprContext*>; @@ -170,8 +168,6 @@ private: VExprContexts _build_expr_ctxs; // other expr std::unique_ptr<VExprContext*> _vother_join_conjunct_ptr; - // output expr - VExprContexts _output_expr_ctxs; // mark the join column whether support null eq std::vector<bool> _is_null_safe_eq_join; @@ -182,7 +178,6 @@ private: std::vector<bool> _probe_not_ignore_null; std::vector<uint16_t> _probe_column_disguise_null; - std::vector<uint16_t> _probe_column_convert_to_null; DataTypes _right_table_data_types; DataTypes _left_table_data_types; @@ -231,7 +226,6 @@ private: bool _have_other_join_conjunct = false; RowDescriptor _row_desc_for_other_join_conjunt; - Block _join_block; std::vector<uint32_t> _items_counts; std::vector<int8_t> _build_block_offsets; @@ -243,8 +237,6 @@ private: std::vector<bool> _left_output_slot_flags; std::vector<bool> _right_output_slot_flags; - RowDescriptor _output_row_desc; - private: void _hash_table_build_thread(RuntimeState* state, std::promise<Status>* status); @@ -252,22 +244,14 @@ private: Status _process_build_block(RuntimeState* state, Block& block, uint8_t offset); - Status _extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, - bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); + Status extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, + bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); - Status _extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, - bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); + Status extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, + bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); void _hash_table_init(); - void _prepare_probe_block(); - - void _construct_mutable_join_block(); - - Status _build_output_block(Block* origin_block, Block* output_block); - - static std::vector<uint16_t> _convert_block_to_null(Block& block); - template <class HashTableContext, bool ignore_null, bool build_unique> friend struct ProcessHashTableBuild; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index ea3047a8d1..957b396a64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -37,6 +37,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; +import org.apache.doris.common.VecNotImplException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.external.hudi.HudiTable; import org.apache.doris.external.hudi.HudiUtils; @@ -261,6 +262,8 @@ public class Analyzer { // to the last Join clause (represented by its rhs table ref) that outer-joined it private final Map<TupleId, TableRef> outerJoinedTupleIds = Maps.newHashMap(); + private final Set<TupleId> outerJoinedMaterializedTupleIds = Sets.newHashSet(); + // Map of registered conjunct to the last full outer join (represented by its // rhs table ref) that outer joined it. public final Map<ExprId, TableRef> fullOuterJoinedConjuncts = Maps.newHashMap(); @@ -786,6 +789,13 @@ public class Analyzer { String key = d.getAlias() + "." + col.getName(); SlotDescriptor result = slotRefMap.get(key); if (result != null) { + // this is a trick to set slot as nullable when slot is on inline view + // When analyze InlineViewRef, we first generate sMap and baseTblSmap and then analyze join. + // We have already registered column ref at that time, but we did not know + // whether inline view is outer joined. So we have to check it and set slot as nullable here. + if (isOuterJoined(d.getId())) { + result.setIsNullable(true); + } result.setMultiRef(true); return result; } @@ -941,6 +951,57 @@ public class Analyzer { } } + public void registerOuterJoinedMaterilizeTids(List<TupleId> tids) { + globalState.outerJoinedMaterializedTupleIds.addAll(tids); + } + + /** + * The main function of this method is to set the column property on the nullable side of the outer join + * to nullable in the case of vectorization. + * For example: + * Query: select * from t1 left join t2 on t1.k1=t2.k1 + * Origin: t2.k1 not null + * Result: t2.k1 is nullable + * + * @throws VecNotImplException In some cases, it is not possible to directly modify the column property to nullable. + * It will report an error and fall back from vectorized mode to non-vectorized mode for execution. + * If the nullside column of the outer join is a column that must return non-null like count(*) + * then there is no way to force the column to be nullable. + * At this time, vectorization cannot support this situation, + * so it is necessary to fall back to non-vectorization for processing. + * For example: + * Query: select * from t1 left join + * (select k1, count(k2) as count_k2 from t2 group by k1) tmp on t1.k1=tmp.k1 + * Origin: tmp.k1 not null, tmp.count_k2 not null + * Result: throw VecNotImplException + */ + public void changeAllOuterJoinTupleToNull() throws VecNotImplException { + for (TupleId tid : globalState.outerJoinedTupleIds.keySet()) { + for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) { + changeSlotToNull(slotDescriptor); + } + } + + for (TupleId tid : globalState.outerJoinedMaterializedTupleIds) { + for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) { + changeSlotToNull(slotDescriptor); + } + } + } + + private void changeSlotToNull(SlotDescriptor slotDescriptor) throws VecNotImplException { + if (slotDescriptor.getSourceExprs().isEmpty()) { + slotDescriptor.setIsNullable(true); + return; + } + for (Expr sourceExpr : slotDescriptor.getSourceExprs()) { + if (!sourceExpr.isNullable()) { + throw new VecNotImplException("The slot (" + slotDescriptor.toString() + + ") could not be changed to nullable"); + } + } + } + /** * Register the given tuple id as being the invisible side of a semi-join. */ @@ -1360,6 +1421,10 @@ public class Analyzer { return globalState.fullOuterJoinedTupleIds.containsKey(tid); } + public boolean isOuterMaterializedJoined(TupleId tid) { + return globalState.outerJoinedMaterializedTupleIds.contains(tid); + } + public boolean isFullOuterJoined(SlotId sid) { return isFullOuterJoined(getTupleId(sid)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index 569be69a98..ed7ba00dd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -21,11 +21,9 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Table; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.IdGenerator; import org.apache.doris.thrift.TDescriptorTable; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; @@ -115,21 +113,6 @@ public class DescriptorTable { return tupleDescs.get(id); } - /** - * Return all tuple desc by idList. - */ - public List<TupleDescriptor> getTupleDesc(List<TupleId> idList) throws AnalysisException { - List<TupleDescriptor> result = Lists.newArrayList(); - for (TupleId tupleId : idList) { - TupleDescriptor tupleDescriptor = getTupleDesc(tupleId); - if (tupleDescriptor == null) { - throw new AnalysisException("Invalid tuple id:" + tupleId.toString()); - } - result.add(tupleDescriptor); - } - return result; - } - public SlotDescriptor getSlotDesc(SlotId id) { return slotDescs.get(id); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java index 4145ee4536..966cfa7e0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java @@ -88,34 +88,13 @@ public final class ExprSubstitutionMap { return lhs.contains(lhsExpr); } - /** - * Returns lhs if the smap contains a mapping for rhsExpr. - */ - public Expr mappingForRhsExpr(Expr rhsExpr) { - for (int i = 0; i < rhs.size(); ++i) { - if (rhs.get(i).equals(rhsExpr)) { - return lhs.get(i); - } - } - return null; - } - - public void removeByRhsExpr(Expr rhsExpr) { - for (int i = 0; i < rhs.size(); ++i) { - if (rhs.get(i).equals(rhsExpr)) { - lhs.remove(i); - rhs.remove(i); - break; - } - } - } - /** * Return a map which is equivalent to applying f followed by g, * i.e., g(f()). * Always returns a non-null map. */ - public static ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g, Analyzer analyzer) { + public static ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g, + Analyzer analyzer) { if (f == null && g == null) { return new ExprSubstitutionMap(); } @@ -151,61 +130,11 @@ public final class ExprSubstitutionMap { return result; } - /** - * Returns the subtraction of two substitution maps. - * f [A.id, B.id] g [A.id, C.id] - * return: g-f [B,id, C,id] - */ - public static ExprSubstitutionMap subtraction(ExprSubstitutionMap f, ExprSubstitutionMap g) { - if (f == null && g == null) { - return new ExprSubstitutionMap(); - } - if (f == null) { - return g; - } - if (g == null) { - return f; - } - ExprSubstitutionMap result = new ExprSubstitutionMap(); - for (int i = 0; i < g.size(); i++) { - if (f.containsMappingFor(g.lhs.get(i))) { - result.put(f.get(g.lhs.get(i)), g.rhs.get(i)); - } else { - result.put(g.lhs.get(i), g.rhs.get(i)); - } - } - return result; - } - - /** - * Returns the replace of two substitution maps. - * f [A.id, B.id] [A.name, B.name] g [A.id, C.id] [A.age, C.age] - * return: [A.id, C,id] [A.name, B.name] [A.age, C.age] - */ - public static ExprSubstitutionMap combineAndReplace(ExprSubstitutionMap f, ExprSubstitutionMap g) { - if (f == null && g == null) { - return new ExprSubstitutionMap(); - } - if (f == null) { - return g; - } - if (g == null) { - return f; - } - ExprSubstitutionMap result = new ExprSubstitutionMap(); - result = ExprSubstitutionMap.combine(result, g); - for (int i = 0; i < f.size(); i++) { - if (!result.containsMappingFor(f.lhs.get(i))) { - result.put(f.lhs.get(i), f.rhs.get(i)); - } - } - return result; - } - /** * Returns the union of two substitution maps. Always returns a non-null map. */ - public static ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) { + public static ExprSubstitutionMap combine(ExprSubstitutionMap f, + ExprSubstitutionMap g) { if (f == null && g == null) { return new ExprSubstitutionMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index d6f413e89e..42a3225699 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -40,6 +40,7 @@ import org.apache.doris.common.TableAliasGenerator; import org.apache.doris.common.TreeNode; import org.apache.doris.common.UserException; import org.apache.doris.common.util.SqlUtils; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.ExprRewriter; @@ -512,6 +513,13 @@ public class SelectStmt extends QueryStmt { analyzer.registerConjuncts(whereClause, false, getTableRefIds()); } + // Change all outer join tuple to null here after analyze where and from clause + // all solt desc of join tuple is ready. Before analyze sort info/agg info/analytic info + // the solt desc nullable mark must be corrected to make sure BE exec query right. + if (VectorizedUtil.isVectorized()) { + analyzer.changeAllOuterJoinTupleToNull(); + } + createSortInfo(analyzer); if (sortInfo != null && CollectionUtils.isNotEmpty(sortInfo.getOrderingExprs())) { if (groupingInfo != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 6d8b9ddb01..107a9a3637 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -489,15 +489,20 @@ public class TableRef implements ParseNode, Writable { if (joinOp == JoinOperator.LEFT_OUTER_JOIN || joinOp == JoinOperator.FULL_OUTER_JOIN) { analyzer.registerOuterJoinedTids(getId().asList(), this); + analyzer.registerOuterJoinedMaterilizeTids(getMaterializedTupleIds()); } if (joinOp == JoinOperator.RIGHT_OUTER_JOIN || joinOp == JoinOperator.FULL_OUTER_JOIN) { analyzer.registerOuterJoinedTids(leftTblRef.getAllTableRefIds(), this); + analyzer.registerOuterJoinedMaterilizeTids(leftTblRef.getAllMaterializedTupleIds()); } // register the tuple ids of a full outer join if (joinOp == JoinOperator.FULL_OUTER_JOIN) { analyzer.registerFullOuterJoinedTids(leftTblRef.getAllTableRefIds(), this); analyzer.registerFullOuterJoinedTids(getId().asList(), this); + + analyzer.registerOuterJoinedMaterilizeTids(leftTblRef.getAllMaterializedTupleIds()); + analyzer.registerOuterJoinedMaterilizeTids(getMaterializedTupleIds()); } // register the tuple id of the rhs of a left semi join diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java similarity index 53% copy from fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java copy to fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java index 296ae5571b..2c5d12e7d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java @@ -15,22 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.util; +package org.apache.doris.common; -import org.apache.doris.qe.ConnectContext; - -public class VectorizedUtil { - /** - * 1. Return false if there is no current connection (Rule1 to be changed) - * 2. Returns the vectorized switch value of the query 'globalState.enableQueryVec' - * 3. If it is not currently a query, return the vectorized switch value of the session 'enableVectorizedEngine' - * @return true: vec. false: non-vec - */ - public static boolean isVectorized() { - ConnectContext connectContext = ConnectContext.get(); - if (connectContext == null) { - return false; - } - return connectContext.getSessionVariable().enableVectorizedEngine(); +public class VecNotImplException extends UserException { + public VecNotImplException(String msg) { + super(msg); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java index 296ae5571b..0eba9f9fc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java @@ -17,7 +17,12 @@ package org.apache.doris.common.util; +import org.apache.doris.analysis.SetVar; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.common.DdlException; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.VariableMgr; public class VectorizedUtil { /** @@ -33,4 +38,34 @@ public class VectorizedUtil { } return connectContext.getSessionVariable().enableVectorizedEngine(); } + + /** + * The purpose of this function is to turn off the vectorization switch for the current query. + * When the vectorization engine cannot meet the requirements of the current query, + * it will convert the current query into a non-vectorized query. + * Note that this will only change the **vectorization switch for a single query**, + * and will not affect other queries in the same session. + * Therefore, even if the vectorization switch of the current query is turned off, + * the vectorization properties of subsequent queries will not be affected. + * + * Session: set enable_vectorized_engine=true; + * Query1: select * from table (vec) + * Query2: select * from t1 left join (select count(*) as count from t2) t3 on t1.k1=t3.count (switch to non-vec) + * Query3: select * from table (still vec) + */ + public static void switchToQueryNonVec() { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return; + } + SessionVariable sessionVariable = connectContext.getSessionVariable(); + sessionVariable.setIsSingleSetVar(true); + try { + VariableMgr.setVar(sessionVariable, new SetVar( + "enable_vectorized_engine", + new StringLiteral("false"))); + } catch (DdlException e) { + // do nothing + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index c8561b54dc..a83aedaa49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -25,7 +25,6 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotId; -import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; @@ -312,7 +311,7 @@ public class AggregationNode extends PlanNode { } @Override - public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { + public Set<SlotId> computeInputSlotIds() throws NotImplementedException { Set<SlotId> result = Sets.newHashSet(); // compute group by slot ArrayList<Expr> groupingExprs = aggInfo.getGroupingExprs(); @@ -325,19 +324,6 @@ public class AggregationNode extends PlanNode { List<SlotId> aggregateSlotIds = Lists.newArrayList(); Expr.getIds(aggregateExprs, null, aggregateSlotIds); result.addAll(aggregateSlotIds); - - // case: select count(*) from test - // result is empty - // Actually need to take a column as the input column of the agg operator - if (result.isEmpty()) { - TupleDescriptor tupleDesc = analyzer.getTupleDesc(getChild(0).getOutputTupleIds().get(0)); - // If the query result is empty set such as: select count(*) from table where 1=2 - // then the materialized slot will be empty - // So the result should be empty also. - if (!tupleDesc.getMaterializedSlots().isEmpty()) { - result.add(tupleDesc.getMaterializedSlots().get(0).getId()); - } - } return result; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 3332182069..9192c0981f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -29,12 +29,10 @@ import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TableRef; -import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.ColumnStats; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CheckedMath; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.Pair; @@ -56,7 +54,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -87,8 +84,6 @@ public class HashJoinNode extends PlanNode { private boolean isBucketShuffle = false; // the flag for bucket shuffle join private List<SlotId> hashOutputSlotIds; - private TupleDescriptor vOutputTupleDesc; - private ExprSubstitutionMap vSrcToOutputSMap; /** * Constructor of HashJoinNode. @@ -254,100 +249,38 @@ public class HashJoinNode extends PlanNode { * * @param slotIdList */ - private void initHashOutputSlotIds(List<SlotId> slotIdList, Analyzer analyzer) { - Set<SlotId> hashOutputSlotIdSet = Sets.newHashSet(); - // step1: change output slot id to src slot id - if (vSrcToOutputSMap != null) { - for (SlotId slotId : slotIdList) { - SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId)); - Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef); - if (srcExpr == null) { - hashOutputSlotIdSet.add(slotId); - } else { - List<SlotRef> srcSlotRefList = Lists.newArrayList(); - srcExpr.collect(SlotRef.class, srcSlotRefList); - hashOutputSlotIdSet - .addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList())); - } - } - } - - // step2: add conjuncts required slots + private void initHashOutputSlotIds(List<SlotId> slotIdList) { + hashOutputSlotIds = new ArrayList<>(slotIdList); List<SlotId> otherAndConjunctSlotIds = Lists.newArrayList(); Expr.getIds(otherJoinConjuncts, null, otherAndConjunctSlotIds); Expr.getIds(conjuncts, null, otherAndConjunctSlotIds); - hashOutputSlotIdSet.addAll(otherAndConjunctSlotIds); - hashOutputSlotIds = new ArrayList<>(hashOutputSlotIdSet); + for (SlotId slotId : otherAndConjunctSlotIds) { + if (!hashOutputSlotIds.contains(slotId)) { + hashOutputSlotIds.add(slotId); + } + } } @Override public void initOutputSlotIds(Set<SlotId> requiredSlotIdSet, Analyzer analyzer) { outputSlotIds = Lists.newArrayList(); - List<TupleDescriptor> outputTupleDescList = Lists.newArrayList(); - if (vOutputTupleDesc != null) { - outputTupleDescList.add(vOutputTupleDesc); - } else { - for (TupleId tupleId : tupleIds) { - outputTupleDescList.add(analyzer.getTupleDesc(tupleId)); - } - } - for (TupleDescriptor tupleDescriptor : outputTupleDescList) { - for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) { - if (slotDescriptor.isMaterialized() - && (requiredSlotIdSet == null || requiredSlotIdSet.contains(slotDescriptor.getId()))) { + for (TupleId tupleId : tupleIds) { + for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getSlots()) { + if (slotDescriptor.isMaterialized() && (requiredSlotIdSet == null || requiredSlotIdSet.contains( + slotDescriptor.getId()))) { outputSlotIds.add(slotDescriptor.getId()); } } } - initHashOutputSlotIds(outputSlotIds, analyzer); - } - - @Override - public void projectOutputTuple() throws NotImplementedException { - if (vOutputTupleDesc == null) { - return; - } - if (vOutputTupleDesc.getSlots().size() == outputSlotIds.size()) { - return; - } - Iterator<SlotDescriptor> iterator = vOutputTupleDesc.getSlots().iterator(); - while (iterator.hasNext()) { - SlotDescriptor slotDescriptor = iterator.next(); - boolean keep = false; - for (SlotId outputSlotId : outputSlotIds) { - if (slotDescriptor.getId().equals(outputSlotId)) { - keep = true; - break; - } - } - if (!keep) { - iterator.remove(); - SlotRef slotRef = new SlotRef(slotDescriptor); - vSrcToOutputSMap.removeByRhsExpr(slotRef); - } - } - vOutputTupleDesc.computeStatAndMemLayout(); + initHashOutputSlotIds(outputSlotIds); } // output slots + predicate slots = input slots @Override - public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { - Set<SlotId> result = Sets.newHashSet(); + public Set<SlotId> computeInputSlotIds() throws NotImplementedException { Preconditions.checkState(outputSlotIds != null); - // step1: change output slot id to src slot id - if (vSrcToOutputSMap != null) { - for (SlotId slotId : outputSlotIds) { - SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId)); - Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef); - if (srcExpr == null) { - result.add(slotId); - } else { - List<SlotRef> srcSlotRefList = Lists.newArrayList(); - srcExpr.collect(SlotRef.class, srcSlotRefList); - result.addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList())); - } - } - } + Set<SlotId> result = Sets.newHashSet(); + result.addAll(outputSlotIds); // eq conjunct List<SlotId> eqConjunctSlotIds = Lists.newArrayList(); Expr.getIds(eqJoinConjuncts, null, eqConjunctSlotIds); @@ -374,109 +307,14 @@ public class HashJoinNode extends PlanNode { ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap(); List<Expr> newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false); - eqJoinConjuncts = - newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity).collect(Collectors.toList()); + eqJoinConjuncts = newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity) + .collect(Collectors.toList()); assignedConjuncts = analyzer.getAssignedConjuncts(); otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false); - - // Only for Vec: create new tuple for join result - if (VectorizedUtil.isVectorized()) { - computeOutputTuple(analyzer); - } - } - - private void computeOutputTuple(Analyzer analyzer) throws AnalysisException { - // 1. create new tuple - vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); - boolean copyLeft = false; - boolean copyRight = false; - boolean leftNullable = false; - boolean rightNullable = false; - switch (joinOp) { - case INNER_JOIN: - case CROSS_JOIN: - copyLeft = true; - copyRight = true; - break; - case LEFT_OUTER_JOIN: - copyLeft = true; - copyRight = true; - rightNullable = true; - break; - case RIGHT_OUTER_JOIN: - copyLeft = true; - copyRight = true; - leftNullable = true; - break; - case FULL_OUTER_JOIN: - copyLeft = true; - copyRight = true; - leftNullable = true; - rightNullable = true; - break; - case LEFT_ANTI_JOIN: - case LEFT_SEMI_JOIN: - case NULL_AWARE_LEFT_ANTI_JOIN: - copyLeft = true; - break; - case RIGHT_ANTI_JOIN: - case RIGHT_SEMI_JOIN: - copyRight = true; - break; - default: - break; - } - ExprSubstitutionMap srcTblRefToOutputTupleSmap = new ExprSubstitutionMap(); - if (copyLeft) { - for (TupleDescriptor leftTupleDesc : analyzer.getDescTbl().getTupleDesc(getChild(0).getOutputTblRefIds())) { - for (SlotDescriptor leftSlotDesc : leftTupleDesc.getSlots()) { - if (!isMaterailizedByChild(leftSlotDesc, getChild(0).getOutputSmap())) { - continue; - } - SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, leftSlotDesc); - if (leftNullable) { - outputSlotDesc.setIsNullable(true); - } - srcTblRefToOutputTupleSmap.put(new SlotRef(leftSlotDesc), new SlotRef(outputSlotDesc)); - } - } - } - if (copyRight) { - for (TupleDescriptor rightTupleDesc : - analyzer.getDescTbl().getTupleDesc(getChild(1).getOutputTblRefIds())) { - for (SlotDescriptor rightSlotDesc : rightTupleDesc.getSlots()) { - if (!isMaterailizedByChild(rightSlotDesc, getChild(1).getOutputSmap())) { - continue; - } - SlotDescriptor outputSlotDesc = - analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, rightSlotDesc); - if (rightNullable) { - outputSlotDesc.setIsNullable(true); - } - srcTblRefToOutputTupleSmap.put(new SlotRef(rightSlotDesc), new SlotRef(outputSlotDesc)); - } - } - } - // 2. compute srcToOutputMap - vSrcToOutputSMap = ExprSubstitutionMap.subtraction(outputSmap, srcTblRefToOutputTupleSmap); - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - Preconditions.checkState(vSrcToOutputSMap.getRhs().get(i) instanceof SlotRef); - SlotRef rSlotRef = (SlotRef) vSrcToOutputSMap.getRhs().get(i); - if (vSrcToOutputSMap.getLhs().get(i) instanceof SlotRef) { - SlotRef lSlotRef = (SlotRef) vSrcToOutputSMap.getLhs().get(i); - rSlotRef.getDesc().setIsMaterialized(lSlotRef.getDesc().isMaterialized()); - } else { - rSlotRef.getDesc().setIsMaterialized(true); - } - } - vOutputTupleDesc.computeStatAndMemLayout(); - // 3. change the outputSmap - outputSmap = ExprSubstitutionMap.combineAndReplace(outputSmap, srcTblRefToOutputTupleSmap); } private void replaceOutputSmapForOuterJoin() { - if (joinOp.isOuterJoin() && !VectorizedUtil.isVectorized()) { + if (joinOp.isOuterJoin()) { List<Expr> lhs = new ArrayList<>(); List<Expr> rhs = new ArrayList<>(); @@ -909,14 +747,6 @@ public class HashJoinNode extends PlanNode { msg.hash_join_node.addToHashOutputSlotIds(slotId.asInt()); } } - if (vSrcToOutputSMap != null) { - for (int i = 0; i < vSrcToOutputSMap.size(); i++) { - msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); - } - } - if (vOutputTupleDesc != null) { - msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt()); - } } @Override @@ -951,9 +781,6 @@ public class HashJoinNode extends PlanNode { } output.append(detailPrefix).append(String.format("cardinality=%s", cardinality)).append("\n"); // todo unify in plan node - if (vOutputTupleDesc != null) { - output.append(detailPrefix).append("vec output tuple id: ").append(vOutputTupleDesc.getId()); - } if (outputSlotIds != null) { output.append(detailPrefix).append("output slot ids: "); for (SlotId slotId : outputSlotIds) { @@ -1003,72 +830,4 @@ public class HashJoinNode extends PlanNode { } super.convertToVectoriezd(); } - - /** - * If parent wants to get hash join node tupleids, - * it will call this function instead of read properties directly. - * The reason is that the tuple id of vOutputTupleDesc the real output tuple id for hash join node. - * - * If you read the properties of @tupleids directly instead of this function, - * it reads the input id of the current node. - */ - @Override - public ArrayList<TupleId> getTupleIds() { - Preconditions.checkState(tupleIds != null); - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); - } - return tupleIds; - } - - @Override - public ArrayList<TupleId> getOutputTblRefIds() { - switch (joinOp) { - case LEFT_SEMI_JOIN: - case LEFT_ANTI_JOIN: - case NULL_AWARE_LEFT_ANTI_JOIN: - return getChild(0).getOutputTblRefIds(); - case RIGHT_SEMI_JOIN: - case RIGHT_ANTI_JOIN: - return getChild(1).getOutputTblRefIds(); - default: - return getTblRefIds(); - } - } - - @Override - public ArrayList<TupleId> getOutputTupleIds() { - if (vOutputTupleDesc != null) { - return Lists.newArrayList(vOutputTupleDesc.getId()); - } - switch (joinOp) { - case LEFT_SEMI_JOIN: - case LEFT_ANTI_JOIN: - case NULL_AWARE_LEFT_ANTI_JOIN: - return getChild(0).getOutputTupleIds(); - case RIGHT_SEMI_JOIN: - case RIGHT_ANTI_JOIN: - return getChild(1).getOutputTupleIds(); - default: - return tupleIds; - } - } - - private boolean isMaterailizedByChild(SlotDescriptor slotDesc, ExprSubstitutionMap smap) { - if (slotDesc.isMaterialized()) { - return true; - } - Expr child = smap.get(new SlotRef(slotDesc)); - if (child == null) { - return false; - } - List<SlotRef> slotRefList = Lists.newArrayList(); - child.collect(SlotRef.class, slotRefList); - for (SlotRef slotRef : slotRefList) { - if (slotRef.getDesc() != null && !slotRef.getDesc().isMaterialized()) { - return false; - } - } - return true; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 5f1d4293b0..466e69a53c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -894,6 +894,7 @@ public class OlapScanNode extends ScanNode { SlotRef deleteSignSlot = new SlotRef(desc.getAliasAsName(), Column.DELETE_SIGN); deleteSignSlot.analyze(analyzer); deleteSignSlot.getDesc().setIsMaterialized(true); + deleteSignSlot.getDesc().setIsNullable(analyzer.isOuterMaterializedJoined(desc.getId())); Expr conjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, deleteSignSlot, new IntLiteral(0)); conjunct.analyze(analyzer); conjuncts.add(conjunct); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 1d7c9b8273..2a030fbf7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -318,14 +318,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { tblRefIds = ids; } - public ArrayList<TupleId> getOutputTblRefIds() { - return tblRefIds; - } - - public ArrayList<TupleId> getOutputTupleIds() { - return tupleIds; - } - public Set<TupleId> getNullableTupleIds() { Preconditions.checkState(nullableTupleIds != null); return nullableTupleIds; @@ -961,11 +953,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { throw new NotImplementedException("The `initOutputSlotIds` hasn't been implemented in " + planNodeName); } - public void projectOutputTuple() throws NotImplementedException { - throw new NotImplementedException("The `projectOutputTuple` hasn't been implemented in " + planNodeName + ". " - + "But it does not affect the project optimizer"); - } - /** * If an plan node implements this method, its child plan node has the ability to implement the project. * The return value of this method will be used as @@ -985,7 +972,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { * agg node * (required slots: a.k1) */ - public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { + public Set<SlotId> computeInputSlotIds() throws NotImplementedException { throw new NotImplementedException("The `computeInputSlotIds` hasn't been implemented in " + planNodeName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java index 643d9ae863..649c6d5270 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java @@ -47,7 +47,6 @@ public class ProjectPlanner { public void projectPlanNode(Set<SlotId> outputSlotIds, PlanNode planNode) { try { planNode.initOutputSlotIds(outputSlotIds, analyzer); - planNode.projectOutputTuple(); } catch (NotImplementedException e) { LOG.debug(e); } @@ -56,7 +55,7 @@ public class ProjectPlanner { } Set<SlotId> inputSlotIds = null; try { - inputSlotIds = planNode.computeInputSlotIds(analyzer); + inputSlotIds = planNode.computeInputSlotIds(); } catch (NotImplementedException e) { LOG.debug(e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java index 6e56f6ffd2..95a13061e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -347,7 +347,6 @@ public abstract class SetOperationNode extends PlanNode { @Override public void init(Analyzer analyzer) throws UserException { Preconditions.checkState(conjuncts.isEmpty()); - createDefaultSmap(analyzer); computeTupleStatAndMemLayout(analyzer); computeStats(analyzer); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 32d0f3961d..38711d025f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1354,14 +1354,9 @@ public class SingleNodePlanner { } unionNode.setTblRefIds(Lists.newArrayList(inlineViewRef.getId())); unionNode.addConstExprList(selectStmt.getBaseTblResultExprs()); - unionNode.init(analyzer); //set outputSmap to substitute literal in outputExpr - unionNode.setWithoutTupleIsNullOutputSmap(inlineViewRef.getSmap()); - if (analyzer.isOuterJoined(inlineViewRef.getId())) { - List<Expr> nullableRhs = TupleIsNullPredicate.wrapExprs( - inlineViewRef.getSmap().getRhs(), unionNode.getTupleIds(), analyzer); - unionNode.setOutputSmap(new ExprSubstitutionMap(inlineViewRef.getSmap().getLhs(), nullableRhs)); - } + unionNode.setOutputSmap(inlineViewRef.getSmap()); + unionNode.init(analyzer); return unionNode; } } @@ -1389,6 +1384,15 @@ public class SingleNodePlanner { List<Expr> nullableRhs = TupleIsNullPredicate.wrapExprs( outputSmap.getRhs(), rootNode.getTupleIds(), analyzer); outputSmap = new ExprSubstitutionMap(outputSmap.getLhs(), nullableRhs); + // When we process outer join with inline views, we set slot descriptor of inline view to nullable firstly. + // When we generate plan, we remove inline view, so the upper node's input is inline view's child. + // So we need to set slot descriptor of inline view's child to nullable to ensure consistent behavior + // with BaseTable. + for (TupleId tupleId : rootNode.getTupleIds()) { + for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getMaterializedSlots()) { + slotDescriptor.setIsNullable(true); + } + } } // Set output smap of rootNode *before* creating a SelectNode for proper resolution. rootNode.setOutputSmap(outputSmap); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 09b783a8c5..2a7d3e7b29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -161,7 +161,7 @@ public class SortNode extends PlanNode { } @Override - public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { + public Set<SlotId> computeInputSlotIds() throws NotImplementedException { List<SlotId> result = Lists.newArrayList(); Expr.getIds(resolvedTupleExprs, null, result); return new HashSet<>(result); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 3ed8f26156..b50f77fb3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -70,6 +70,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.VecNotImplException; import org.apache.doris.common.Version; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; @@ -79,6 +80,7 @@ import org.apache.doris.common.util.QueryPlannerProfile; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.load.EtlJobType; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; @@ -606,6 +608,13 @@ public class StmtExecutor implements ProfileWriter { } else { resetAnalyzerAndStmt(); } + } catch (VecNotImplException e) { + if (i == analyzeTimes) { + throw e; + } else { + resetAnalyzerAndStmt(); + VectorizedUtil.switchToQueryNonVec(); + } } catch (UserException e) { throw e; } catch (Exception e) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java index 269daa39ff..de0d525daa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; +import org.apache.doris.common.VecNotImplException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.rewrite.FoldConstantsRule; @@ -244,6 +245,13 @@ public class QueryStmtTest { constMap.clear(); constMap = getConstantExprMap(exprsMap, analyzer); Assert.assertEquals(4, constMap.size()); + } else { + try { + UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + Assert.fail(); + } catch (VecNotImplException e) { + Assert.assertTrue(e.getMessage().contains("could not be changed to nullable")); + } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java index 375afd5fef..0159edba6c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java @@ -87,8 +87,8 @@ public class ProjectPlannerFunctionTest { String queryStr = "desc verbose select a.k2 from test.t1 a inner join test.t1 b on a.k1=b.k1 " + "inner join test.t1 c on a.k1=c.k1;"; String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); - Assert.assertTrue(explainString.contains("output slot ids: 8")); - Assert.assertTrue(explainString.contains("output slot ids: 4 5")); + Assert.assertTrue(explainString.contains("output slot ids: 3")); + Assert.assertTrue(explainString.contains("output slot ids: 0 3")); } // keep a.k2 after a join b diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index a439072b39..8d9a115094 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1136,21 +1136,20 @@ public class QueryPlanTest extends TestWithFeService { Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE")); // support recurse of bucket shuffle join - // TODO: support the UT in the future queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2" + " on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3" + " on t2.k1 = t3.k1 and t2.k2 = t3.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`")); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`")); // support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2" + " on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3" + " on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and t4.k1 = t2.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); // some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2" @@ -1183,9 +1182,6 @@ public class QueryPlanTest extends TestWithFeService { } } - // disable bucket shuffle join - Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false); - String queryStr = "explain select * from mysql_table t2, jointest t1 where t1.k1 = t2.k1"; String explainString = getSQLPlanOrErrorMsg(queryStr); Assert.assertTrue(explainString.contains("INNER JOIN(BROADCAST)")); @@ -1233,8 +1229,6 @@ public class QueryPlanTest extends TestWithFeService { } } - // disable bucket shuffle join - Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false); String queryStr = "explain select * from odbc_mysql t2, jointest t1 where t1.k1 = t2.k1"; String explainString = getSQLPlanOrErrorMsg(queryStr); Assert.assertTrue(explainString.contains("INNER JOIN(BROADCAST)")); @@ -1329,9 +1323,7 @@ public class QueryPlanTest extends TestWithFeService { @Test public void testPreferBroadcastJoin() throws Exception { connectContext.setDatabase("default_cluster:test"); - String queryStr = "explain select * from (select k2 from jointest)t2, jointest t1 where t1.k1 = t2.k2"; - // disable bucket shuffle join - Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false); + String queryStr = "explain select * from (select k2 from jointest group by k2)t2, jointest t1 where t1.k1 = t2.k2"; // default set PreferBroadcastJoin true String explainString = getSQLPlanOrErrorMsg(queryStr); @@ -1597,31 +1589,32 @@ public class QueryPlanTest extends TestWithFeService { //valid date String sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a right outer JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)"; String explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:<slot 2> | <slot 3>")); + Assert.assertTrue(explainString.contains("OUTPUT EXPRS:`a`.`aid` | 4")); sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a left outer JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:<slot 2> | <slot 3>")); + Assert.assertTrue(explainString.contains("OUTPUT EXPRS:3 | `b`.`bid`")); sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a full outer JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:<slot 2> | <slot 3>")); + Assert.assertTrue(explainString.contains("OUTPUT EXPRS:`a`.`aid` | `b`.`bid`")); sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:<slot 2> | <slot 3>")); + Assert.assertTrue(explainString.contains("OUTPUT EXPRS:3 | 4")); sql = "SELECT a.k1, b.k2 FROM (SELECT k1 from baseall) a LEFT OUTER JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("<slot 5> | <slot 7>")); + Assert.assertTrue(explainString.contains("if(TupleIsNull(2), NULL, 999)")); sql = "SELECT a.k1, b.k2 FROM (SELECT 1 as k1 from baseall) a RIGHT OUTER JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("<slot 5> | <slot 7>")); + Assert.assertTrue(explainString.contains("if(TupleIsNull(0), NULL, 1)")); sql = "SELECT a.k1, b.k2 FROM (SELECT 1 as k1 from baseall) a FULL JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("<slot 5> | <slot 7>")); + Assert.assertTrue(explainString.contains("if(TupleIsNull(0), NULL, 1)")); + Assert.assertTrue(explainString.contains("if(TupleIsNull(2), NULL, 999)")); } @Test @@ -2070,7 +2063,7 @@ public class QueryPlanTest extends TestWithFeService { String explainString = getSQLPlanOrErrorMsg(queryStr); Assert.assertFalse(explainString.contains("OUTPUT EXPRS:3 | 4")); System.out.println(explainString); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:CAST(<slot 4> AS INT) | CAST(<slot 5> AS INT)")); + Assert.assertTrue(explainString.contains("OUTPUT EXPRS:CAST(`a`.`aid` AS INT) | 4")); } @Test diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 06d6e26c3b..fb3a87fc4a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -409,10 +409,6 @@ struct THashJoinNode { // hash output column 6: optional list<Types.TSlotId> hash_output_slot_ids - - 7: optional list<Exprs.TExpr> srcExprList - - 8: optional Types.TTupleId voutput_tuple_id } struct TMergeJoinNode { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org