This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5c020be4d24 [Bug](join) corner case cause the mark join + null aware left join core dump in regression test in pipeline query engine (#25087) 5c020be4d24 is described below commit 5c020be4d247c6c762cff53e2c1f9d28efa68b20 Author: HappenLee <happen...@hotmail.com> AuthorDate: Sun Oct 8 22:50:12 2023 +0800 [Bug](join) corner case cause the mark join + null aware left join core dump in regression test in pipeline query engine (#25087) --- be/src/pipeline/exec/operator.h | 7 +--- be/src/pipeline/pipeline_task.cpp | 7 +--- be/src/vec/exec/join/vhash_join_node.cpp | 70 +++++++++++++++----------------- be/src/vec/exec/join/vhash_join_node.h | 4 +- be/src/vec/exec/join/vjoin_node_base.h | 4 +- be/src/vec/exec/scan/vscan_node.cpp | 3 -- 6 files changed, 39 insertions(+), 56 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 73e2d5d41b8..4ba2aec977f 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -450,12 +450,7 @@ public: if (node->need_more_input_data()) { _child_block->clear_column_data(); - Status status = child->get_block(state, _child_block.get(), _child_source_state); - if (status.is<777>()) { - LOG(INFO) << "Scan block nullptr error _source_state:" << int(source_state) - << " query id:" << print_id(state->query_id()); - } - RETURN_IF_ERROR(status); + RETURN_IF_ERROR(child->get_block(state, _child_block.get(), _child_source_state)); source_state = _child_source_state; if (_child_block->rows() == 0 && _child_source_state != SourceState::FINISHED) { return Status::OK(); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 8061253d2ec..a0f77578e73 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -289,12 +289,7 @@ Status PipelineTask::execute(bool* eos) { { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); - auto status = _root->get_block(_state, block, _data_state); - if (status.is<777>()) { - LOG(FATAL) << "Scan block nullptr error: can read:" << source_can_read() - << " query id:" << print_id(_state->query_id()); - } - RETURN_IF_ERROR(status); + RETURN_IF_ERROR(_root->get_block(_state, block, _data_state)); } *eos = _data_state == SourceState::FINISHED; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 75ea6b06ba8..aa91846cc8b 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -418,7 +418,7 @@ Status HashJoinNode::close(RuntimeState* state) { bool HashJoinNode::need_more_input_data() const { return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) && !_probe_eos && - (!_short_circuit_for_probe || _is_mark_join); + !_short_circuit_for_probe; } void HashJoinNode::prepare_for_next() { @@ -430,45 +430,46 @@ void HashJoinNode::prepare_for_next() { Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { SCOPED_TIMER(_probe_timer); if (_short_circuit_for_probe) { - /// If `_short_circuit_for_probe` is true, this indicates no rows + // If we use a short-circuit strategy, should return empty block directly. + *eos = true; + return Status::OK(); + } + + if (_short_circuit_for_null_in_probe_side && _is_mark_join) { + /// If `_short_circuit_for_null_in_probe_side` is true, this indicates no rows /// match the join condition, and this is 'mark join', so we need to create a column as mark /// with all rows set to 0. - if (_is_mark_join) { - auto block_rows = _probe_block.rows(); - if (block_rows == 0) { - *eos = _probe_eos; - return Status::OK(); - } - - Block temp_block; - //get probe side output column - for (int i = 0; i < _left_output_slot_flags.size(); ++i) { - if (_left_output_slot_flags[i]) { - temp_block.insert(_probe_block.get_by_position(i)); - } - } - auto mark_column = ColumnUInt8::create(block_rows, 0); - temp_block.insert({std::move(mark_column), std::make_shared<DataTypeUInt8>(), ""}); + auto block_rows = _probe_block.rows(); + if (block_rows == 0) { + *eos = _probe_eos; + return Status::OK(); + } - { - SCOPED_TIMER(_join_filter_timer); - RETURN_IF_ERROR( - VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns())); + Block temp_block; + //get probe side output column + for (int i = 0; i < _left_output_slot_flags.size(); ++i) { + if (_left_output_slot_flags[i]) { + temp_block.insert(_probe_block.get_by_position(i)); } + } + auto mark_column = ColumnUInt8::create(block_rows, 0); + temp_block.insert({std::move(mark_column), std::make_shared<DataTypeUInt8>(), ""}); - RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false)); - temp_block.clear(); - release_block_memory(_probe_block); - reached_limit(output_block, eos); - return Status::OK(); + { + SCOPED_TIMER(_join_filter_timer); + RETURN_IF_ERROR( + VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns())); } - // If we use a short-circuit strategy, should return empty block directly. - *eos = true; + + RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false)); + temp_block.clear(); + release_block_memory(_probe_block); + reached_limit(output_block, eos); return Status::OK(); } //TODO: this short circuit maybe could refactor, no need to check at here. - if (_short_circuit_for_probe_and_additional_data) { + if (_empty_right_table_need_probe_dispose) { // when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN // we could get the result is probe table + null-column(if need output) // If we use a short-circuit strategy, should return block directly by add additional null data. @@ -641,13 +642,8 @@ Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_bloc Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (_is_hash_join_early_start_probe_eos(state)) { - *eos = true; - return Status::OK(); - } - - if (_short_circuit_for_probe && !_is_mark_join) { - // If we use a short-circuit strategy, should return empty block directly. + // If we use a short-circuit strategy, should return empty block directly. + if (_is_hash_join_early_start_probe_eos(state) || _short_circuit_for_probe) { *eos = true; return Status::OK(); } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index c60f1a0c7ae..c75ab58357c 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -577,7 +577,7 @@ private: void _init_short_circuit_for_probe() override { _short_circuit_for_probe = (_short_circuit_for_null_in_probe_side && - _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) || + _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_mark_join) || (_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && !_is_mark_join) || (_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN && !_is_mark_join) || (_build_blocks->empty() && _join_op == TJoinOp::RIGHT_OUTER_JOIN) || @@ -586,7 +586,7 @@ private: //when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN //we could get the result is probe table + null-column(if need output) - _short_circuit_for_probe_and_additional_data = + _empty_right_table_need_probe_dispose = (_build_blocks->empty() && !_have_other_join_conjunct && !_is_mark_join) && (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == TJoinOp::LEFT_ANTI_JOIN); diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 234374e3c0e..0de7ae11064 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -99,7 +99,7 @@ protected: virtual void _init_short_circuit_for_probe() { _short_circuit_for_probe = false; - _short_circuit_for_probe_and_additional_data = false; + _empty_right_table_need_probe_dispose = false; } TJoinOp::type _join_op; @@ -128,7 +128,7 @@ protected: bool _short_circuit_for_probe = false; // for some join, when build side rows is empty, we could return directly by add some additional null data in probe table. - bool _short_circuit_for_probe_and_additional_data = false; + bool _empty_right_table_need_probe_dispose = false; std::unique_ptr<RowDescriptor> _output_row_desc; std::unique_ptr<RowDescriptor> _intermediate_row_desc; // output expr diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index c9c4cc7e1ee..0e6b8a54db8 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -264,9 +264,6 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* return Status::OK(); } - if (scan_block == nullptr) { - return Status::Error<777>("not pointer in scan pipline"); - } // get scanner's block memory block->swap(*scan_block); _scanner_ctx->return_free_block(std::move(scan_block)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org