This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c020be4d24 [Bug](join) corner case cause the mark join + null aware 
left join core dump in regression test in pipeline query engine (#25087)
5c020be4d24 is described below

commit 5c020be4d247c6c762cff53e2c1f9d28efa68b20
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Sun Oct 8 22:50:12 2023 +0800

    [Bug](join) corner case cause the mark join + null aware left join core 
dump in regression test in pipeline query engine (#25087)
---
 be/src/pipeline/exec/operator.h          |  7 +---
 be/src/pipeline/pipeline_task.cpp        |  7 +---
 be/src/vec/exec/join/vhash_join_node.cpp | 70 +++++++++++++++-----------------
 be/src/vec/exec/join/vhash_join_node.h   |  4 +-
 be/src/vec/exec/join/vjoin_node_base.h   |  4 +-
 be/src/vec/exec/scan/vscan_node.cpp      |  3 --
 6 files changed, 39 insertions(+), 56 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 73e2d5d41b8..4ba2aec977f 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -450,12 +450,7 @@ public:
 
         if (node->need_more_input_data()) {
             _child_block->clear_column_data();
-            Status status = child->get_block(state, _child_block.get(), 
_child_source_state);
-            if (status.is<777>()) {
-                LOG(INFO) << "Scan block nullptr error _source_state:" << 
int(source_state)
-                          << " query id:" << print_id(state->query_id());
-            }
-            RETURN_IF_ERROR(status);
+            RETURN_IF_ERROR(child->get_block(state, _child_block.get(), 
_child_source_state));
             source_state = _child_source_state;
             if (_child_block->rows() == 0 && _child_source_state != 
SourceState::FINISHED) {
                 return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 8061253d2ec..a0f77578e73 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -289,12 +289,7 @@ Status PipelineTask::execute(bool* eos) {
         {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
-            auto status = _root->get_block(_state, block, _data_state);
-            if (status.is<777>()) {
-                LOG(FATAL) << "Scan block nullptr error: can read:" << 
source_can_read()
-                           << " query id:" << print_id(_state->query_id());
-            }
-            RETURN_IF_ERROR(status);
+            RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
         }
         *eos = _data_state == SourceState::FINISHED;
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 75ea6b06ba8..aa91846cc8b 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -418,7 +418,7 @@ Status HashJoinNode::close(RuntimeState* state) {
 
 bool HashJoinNode::need_more_input_data() const {
     return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) 
&& !_probe_eos &&
-           (!_short_circuit_for_probe || _is_mark_join);
+           !_short_circuit_for_probe;
 }
 
 void HashJoinNode::prepare_for_next() {
@@ -430,45 +430,46 @@ void HashJoinNode::prepare_for_next() {
 Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* 
output_block, bool* eos) {
     SCOPED_TIMER(_probe_timer);
     if (_short_circuit_for_probe) {
-        /// If `_short_circuit_for_probe` is true, this indicates no rows
+        // If we use a short-circuit strategy, should return empty block 
directly.
+        *eos = true;
+        return Status::OK();
+    }
+
+    if (_short_circuit_for_null_in_probe_side && _is_mark_join) {
+        /// If `_short_circuit_for_null_in_probe_side` is true, this indicates 
no rows
         /// match the join condition, and this is 'mark join', so we need to 
create a column as mark
         /// with all rows set to 0.
-        if (_is_mark_join) {
-            auto block_rows = _probe_block.rows();
-            if (block_rows == 0) {
-                *eos = _probe_eos;
-                return Status::OK();
-            }
-
-            Block temp_block;
-            //get probe side output column
-            for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
-                if (_left_output_slot_flags[i]) {
-                    temp_block.insert(_probe_block.get_by_position(i));
-                }
-            }
-            auto mark_column = ColumnUInt8::create(block_rows, 0);
-            temp_block.insert({std::move(mark_column), 
std::make_shared<DataTypeUInt8>(), ""});
+        auto block_rows = _probe_block.rows();
+        if (block_rows == 0) {
+            *eos = _probe_eos;
+            return Status::OK();
+        }
 
-            {
-                SCOPED_TIMER(_join_filter_timer);
-                RETURN_IF_ERROR(
-                        VExprContext::filter_block(_conjuncts, &temp_block, 
temp_block.columns()));
+        Block temp_block;
+        //get probe side output column
+        for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
+            if (_left_output_slot_flags[i]) {
+                temp_block.insert(_probe_block.get_by_position(i));
             }
+        }
+        auto mark_column = ColumnUInt8::create(block_rows, 0);
+        temp_block.insert({std::move(mark_column), 
std::make_shared<DataTypeUInt8>(), ""});
 
-            RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, 
false));
-            temp_block.clear();
-            release_block_memory(_probe_block);
-            reached_limit(output_block, eos);
-            return Status::OK();
+        {
+            SCOPED_TIMER(_join_filter_timer);
+            RETURN_IF_ERROR(
+                    VExprContext::filter_block(_conjuncts, &temp_block, 
temp_block.columns()));
         }
-        // If we use a short-circuit strategy, should return empty block 
directly.
-        *eos = true;
+
+        RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));
+        temp_block.clear();
+        release_block_memory(_probe_block);
+        reached_limit(output_block, eos);
         return Status::OK();
     }
 
     //TODO: this short circuit maybe could refactor, no need to check at here.
-    if (_short_circuit_for_probe_and_additional_data) {
+    if (_empty_right_table_need_probe_dispose) {
         // when build table rows is 0 and not have other_join_conjunct and 
join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
         // we could get the result is probe table + null-column(if need output)
         // If we use a short-circuit strategy, should return block directly by 
add additional null data.
@@ -641,13 +642,8 @@ Status HashJoinNode::push(RuntimeState* /*state*/, 
vectorized::Block* input_bloc
 Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* 
eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
-    if (_is_hash_join_early_start_probe_eos(state)) {
-        *eos = true;
-        return Status::OK();
-    }
-
-    if (_short_circuit_for_probe && !_is_mark_join) {
-        // If we use a short-circuit strategy, should return empty block 
directly.
+    // If we use a short-circuit strategy, should return empty block directly.
+    if (_is_hash_join_early_start_probe_eos(state) || 
_short_circuit_for_probe) {
         *eos = true;
         return Status::OK();
     }
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index c60f1a0c7ae..c75ab58357c 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -577,7 +577,7 @@ private:
     void _init_short_circuit_for_probe() override {
         _short_circuit_for_probe =
                 (_short_circuit_for_null_in_probe_side &&
-                 _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
+                 _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!_is_mark_join) ||
                 (_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
                 (_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN 
&& !_is_mark_join) ||
                 (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
@@ -586,7 +586,7 @@ private:
 
         //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
         //we could get the result is probe table + null-column(if need output)
-        _short_circuit_for_probe_and_additional_data =
+        _empty_right_table_need_probe_dispose =
                 (_build_blocks->empty() && !_have_other_join_conjunct && 
!_is_mark_join) &&
                 (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN ||
                  _join_op == TJoinOp::LEFT_ANTI_JOIN);
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index 234374e3c0e..0de7ae11064 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -99,7 +99,7 @@ protected:
 
     virtual void _init_short_circuit_for_probe() {
         _short_circuit_for_probe = false;
-        _short_circuit_for_probe_and_additional_data = false;
+        _empty_right_table_need_probe_dispose = false;
     }
 
     TJoinOp::type _join_op;
@@ -128,7 +128,7 @@ protected:
     bool _short_circuit_for_probe = false;
 
     // for some join, when build side rows is empty, we could return directly 
by add some additional null data in probe table.
-    bool _short_circuit_for_probe_and_additional_data = false;
+    bool _empty_right_table_need_probe_dispose = false;
     std::unique_ptr<RowDescriptor> _output_row_desc;
     std::unique_ptr<RowDescriptor> _intermediate_row_desc;
     // output expr
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index c9c4cc7e1ee..0e6b8a54db8 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -264,9 +264,6 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
         return Status::OK();
     }
 
-    if (scan_block == nullptr) {
-        return Status::Error<777>("not pointer in scan pipline");
-    }
     // get scanner's block memory
     block->swap(*scan_block);
     _scanner_ctx->return_free_block(std::move(scan_block));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to