This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ecf69b09b [pipeline](regression) nested loop join test get error 
result in pipeline engine and refactor the code for need more input data 
(#15208)
8ecf69b09b is described below

commit 8ecf69b09b034612967223a35cb65bad5c742967
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Wed Dec 21 19:03:51 2022 +0800

    [pipeline](regression) nested loop join test get error result in pipeline 
engine and refactor the code for need more input data (#15208)
---
 be/src/pipeline/exec/operator.h                 | 22 +++---
 be/src/pipeline/exec/repeat_operator.cpp        | 11 +++
 be/src/pipeline/exec/repeat_operator.h          |  4 ++
 be/src/vec/exec/join/vhash_join_node.cpp        | 29 ++++----
 be/src/vec/exec/join/vnested_loop_join_node.cpp |  2 +-
 be/src/vec/exec/scan/vscan_node.cpp             |  3 +-
 be/src/vec/exec/vrepeat_node.cpp                | 90 +++++++++++--------------
 be/src/vec/exec/vrepeat_node.h                  |  5 +-
 be/src/vec/exec/vtable_function_node.cpp        | 34 ++++------
 be/src/vec/exec/vtable_function_node.h          |  4 +-
 10 files changed, 95 insertions(+), 109 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 652486aa34..58c36b3a51 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -432,24 +432,24 @@ public:
         auto& child = StreamingOperator<OperatorBuilderType>::_child;
 
         if (node->need_more_input_data()) {
+            _child_block->clear_column_data();
             RETURN_IF_ERROR(child->get_block(state, _child_block.get(), 
_child_source_state));
             source_state = _child_source_state;
-            if (_child_block->rows() == 0 && source_state != 
SourceState::FINISHED) {
+            if (_child_block->rows() == 0 && _child_source_state != 
SourceState::FINISHED) {
                 return Status::OK();
             }
             node->prepare_for_next();
-            node->push(state, _child_block.get(), source_state == 
SourceState::FINISHED);
+            node->push(state, _child_block.get(), _child_source_state == 
SourceState::FINISHED);
         }
 
-        bool eos = false;
-        RETURN_IF_ERROR(node->pull(state, block, &eos));
-        if (eos) {
-            source_state = SourceState::FINISHED;
-            _child_block->clear_column_data();
-        } else if (!node->need_more_input_data()) {
-            source_state = SourceState::MORE_DATA;
-        } else {
-            _child_block->clear_column_data();
+        if (!node->need_more_input_data()) {
+            bool eos = false;
+            RETURN_IF_ERROR(node->pull(state, block, &eos));
+            if (eos) {
+                source_state = SourceState::FINISHED;
+            } else if (!node->need_more_input_data()) {
+                source_state = SourceState::MORE_DATA;
+            }
         }
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index def1f6da9d..d2c9f0a1e2 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -23,4 +23,15 @@ namespace doris::pipeline {
 
 OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator)
 
+Status RepeatOperator::prepare(doris::RuntimeState* state) {
+    // just for speed up, the way is dangerous
+    _child_block.reset(_node->get_child_block());
+    return StatefulOperator::prepare(state);
+}
+
+Status RepeatOperator::close(doris::RuntimeState* state) {
+    _child_block.release();
+    return StatefulOperator::close(state);
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/repeat_operator.h 
b/be/src/pipeline/exec/repeat_operator.h
index 15707ea39c..b397ea05d3 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -37,6 +37,10 @@ public:
 class RepeatOperator final : public StatefulOperator<RepeatOperatorBuilder> {
 public:
     RepeatOperator(OperatorBuilderBase* operator_builder, ExecNode* 
repeat_node);
+
+    Status prepare(RuntimeState* state) override;
+
+    Status close(RuntimeState* state) override;
 };
 
 } // namespace pipeline
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 337aec7b1e..fc2a7ed4dc 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -597,24 +597,19 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* 
output_block, bool* eo
         *eos = true;
         return Status::OK();
     }
-    if (need_more_input_data()) {
+    while (need_more_input_data()) {
         prepare_for_next();
-        do {
-            SCOPED_TIMER(_probe_next_timer);
-            RETURN_IF_ERROR_AND_CHECK_SPAN(
-                    child(0)->get_next_after_projects(
-                            state, &_probe_block, &_probe_eos,
-                            std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*,
-                                                           bool*)) &
-                                              ExecNode::get_next,
-                                      _children[0], std::placeholders::_1, 
std::placeholders::_2,
-                                      std::placeholders::_3)),
-                    child(0)->get_next_span(), _probe_eos);
-        } while (_probe_block.rows() == 0 && !_probe_eos);
-
-        if (_probe_block.rows() != 0) {
-            RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos));
-        }
+        SCOPED_TIMER(_probe_next_timer);
+        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                child(0)->get_next_after_projects(
+                        state, &_probe_block, &_probe_eos,
+                        std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*, bool*)) &
+                                          ExecNode::get_next,
+                                  _children[0], std::placeholders::_1, 
std::placeholders::_2,
+                                  std::placeholders::_3)),
+                child(0)->get_next_span(), _probe_eos);
+
+        RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos));
     }
 
     return pull(state, output_block, eos);
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 82cb6f8605..7fc43fcf19 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -671,7 +671,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, 
vectorized::Block* block,
 }
 
 bool VNestedLoopJoinNode::need_more_input_data() const {
-    return _need_more_input_data;
+    return _need_more_input_data and !_left_side_eos;
 }
 
 void VNestedLoopJoinNode::release_resource(doris::RuntimeState* state) {
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 8d436bb733..2f35cd47fc 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -262,8 +262,7 @@ Status VScanNode::_acquire_runtime_filter(bool wait) {
                    !_runtime_filter_ctxs[i].apply_mark) {
             _blocked_by_rf = true;
         } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(!_blocked_by_rf &&
-                   runtime_filter->current_state() != 
RuntimeFilterState::NOT_READY);
+            DCHECK(runtime_filter->current_state() != 
RuntimeFilterState::NOT_READY);
             _is_all_rf_applied = false;
         }
     }
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index 961e5aad4b..01db7d29ef 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -55,7 +55,6 @@ Status VRepeatNode::prepare(RuntimeState* state) {
     for (const auto& slot_desc : _output_tuple_desc->slots()) {
         _output_slots.push_back(slot_desc);
     }
-    _child_block.reset(new Block());
 
     return Status::OK();
 }
@@ -181,50 +180,51 @@ Status VRepeatNode::pull(doris::RuntimeState* state, 
vectorized::Block* output_b
     }
     DCHECK(output_block->rows() == 0);
 
-    if (!_intermediate_block || _intermediate_block->rows() == 0) {
-        return Status::OK();
-    }
+    if (_intermediate_block && _intermediate_block->rows() > 0) {
+        RETURN_IF_ERROR(
+                get_repeated_block(_intermediate_block.get(), _repeat_id_idx, 
output_block));
 
-    RETURN_IF_ERROR(get_repeated_block(_intermediate_block.get(), 
_repeat_id_idx, output_block));
+        _repeat_id_idx++;
 
-    _repeat_id_idx++;
-
-    int size = _repeat_id_list.size();
-    if (_repeat_id_idx >= size) {
-        _intermediate_block->clear();
-        release_block_memory(*_child_block);
-        _repeat_id_idx = 0;
+        int size = _repeat_id_list.size();
+        if (_repeat_id_idx >= size) {
+            _intermediate_block->clear();
+            release_block_memory(_child_block);
+            _repeat_id_idx = 0;
+        }
     }
 
+    *eos = _child_eos && _child_block.rows() == 0;
     reached_limit(output_block, eos);
     COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     return Status::OK();
 }
 
 Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, 
bool eos) {
-    if (input_block->rows() == 0) {
-        return Status::OK();
-    }
+    _child_eos = eos;
     DCHECK(!_intermediate_block || _intermediate_block->rows() == 0);
     DCHECK(!_expr_ctxs.empty());
-    _intermediate_block.reset(new Block());
-
-    for (auto expr : _expr_ctxs) {
-        int result_column_id = -1;
-        RETURN_IF_ERROR(expr->execute(input_block, &result_column_id));
-        DCHECK(result_column_id != -1);
-        input_block->get_by_position(result_column_id).column =
-                input_block->get_by_position(result_column_id)
-                        .column->convert_to_full_column_if_const();
-        
_intermediate_block->insert(input_block->get_by_position(result_column_id));
+
+    if (input_block->rows() > 0) {
+        _intermediate_block.reset(new Block());
+
+        for (auto expr : _expr_ctxs) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(expr->execute(input_block, &result_column_id));
+            DCHECK(result_column_id != -1);
+            input_block->get_by_position(result_column_id).column =
+                    input_block->get_by_position(result_column_id)
+                            .column->convert_to_full_column_if_const();
+            
_intermediate_block->insert(input_block->get_by_position(result_column_id));
+        }
+        DCHECK_EQ(_expr_ctxs.size(), _intermediate_block->columns());
     }
-    DCHECK_EQ(_expr_ctxs.size(), _intermediate_block->columns());
 
     return Status::OK();
 }
 
 bool VRepeatNode::need_more_input_data() {
-    return !_intermediate_block || _intermediate_block->rows() == 0;
+    return !_child_block.rows() && !_child_eos;
 }
 
 Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
@@ -241,26 +241,17 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
         DCHECK(_repeat_id_idx <= (int)v.size());
     }
     DCHECK(block->rows() == 0);
-
-    if (need_more_input_data()) {
-        while (_child_block->rows() == 0 && !_child_eos) {
-            RETURN_IF_ERROR_AND_CHECK_SPAN(
-                    child(0)->get_next_after_projects(
-                            state, _child_block.get(), &_child_eos,
-                            std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*,
-                                                           bool*)) &
-                                              ExecNode::get_next,
-                                      _children[0], std::placeholders::_1, 
std::placeholders::_2,
-                                      std::placeholders::_3)),
-                    child(0)->get_next_span(), _child_eos);
-        }
-
-        if (_child_eos and _child_block->rows() == 0) {
-            *eos = true;
-            return Status::OK();
-        }
-
-        push(state, _child_block.get(), *eos);
+    while (need_more_input_data()) {
+        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                child(0)->get_next_after_projects(
+                        state, &_child_block, &_child_eos,
+                        std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*, bool*)) &
+                                          ExecNode::get_next,
+                                  _children[0], std::placeholders::_1, 
std::placeholders::_2,
+                                  std::placeholders::_3)),
+                child(0)->get_next_span(), _child_eos);
+
+        push(state, &_child_block, _child_eos);
     }
 
     return pull(state, block, eos);
@@ -294,9 +285,4 @@ void VRepeatNode::debug_string(int indentation_level, 
std::stringstream* out) co
     *out << ")";
 }
 
-void VRepeatNode::_release_mem() {
-    _child_block = nullptr;
-    _intermediate_block = nullptr;
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h
index 53eb025cc1..394690d729 100644
--- a/be/src/vec/exec/vrepeat_node.h
+++ b/be/src/vec/exec/vrepeat_node.h
@@ -46,6 +46,7 @@ public:
     Status pull(RuntimeState* state, vectorized::Block* output_block, bool* 
eos) override;
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override;
     bool need_more_input_data();
+    Block* get_child_block() { return &_child_block; }
 
 protected:
     virtual void debug_string(int indentation_level, std::stringstream* out) 
const override;
@@ -53,8 +54,6 @@ protected:
 private:
     Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* 
output_block);
 
-    void _release_mem();
-
     // Slot id set used to indicate those slots need to set to null.
     std::vector<std::set<SlotId>> _slot_id_set_list;
     // all slot id
@@ -65,7 +64,7 @@ private:
     TupleId _output_tuple_id;
     const TupleDescriptor* _output_tuple_desc;
 
-    std::unique_ptr<Block> _child_block {};
+    Block _child_block;
     std::unique_ptr<Block> _intermediate_block {};
 
     std::vector<SlotDescriptor*> _output_slots;
diff --git a/be/src/vec/exec/vtable_function_node.cpp 
b/be/src/vec/exec/vtable_function_node.cpp
index 9336040f9a..c26bfdba21 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -83,29 +83,20 @@ Status VTableFunctionNode::get_next(RuntimeState* state, 
Block* block, bool* eos
     RETURN_IF_CANCELLED(state);
 
     // if child_block is empty, get data from child.
-    if (need_more_input_data()) {
-        while (_child_block.rows() == 0 && !_child_eos) {
-            RETURN_IF_ERROR_AND_CHECK_SPAN(
-                    child(0)->get_next_after_projects(
-                            state, &_child_block, &_child_eos,
-                            std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*,
-                                                           bool*)) &
-                                              ExecNode::get_next,
-                                      _children[0], std::placeholders::_1, 
std::placeholders::_2,
-                                      std::placeholders::_3)),
-                    child(0)->get_next_span(), _child_eos);
-        }
-        if (_child_eos && _child_block.rows() == 0) {
-            *eos = true;
-            return Status::OK();
-        }
-
-        push(state, &_child_block, *eos);
+    while (need_more_input_data()) {
+        RETURN_IF_ERROR_AND_CHECK_SPAN(
+                child(0)->get_next_after_projects(
+                        state, &_child_block, &_child_eos,
+                        std::bind((Status(ExecNode::*)(RuntimeState*, 
vectorized::Block*, bool*)) &
+                                          ExecNode::get_next,
+                                  _children[0], std::placeholders::_1, 
std::placeholders::_2,
+                                  std::placeholders::_3)),
+                child(0)->get_next_span(), _child_eos);
+
+        push(state, &_child_block, _child_eos);
     }
 
-    pull(state, block, eos);
-
-    return Status::OK();
+    return pull(state, block, eos);
 }
 
 Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* 
output_block, bool* eos) {
@@ -204,6 +195,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* 
state, Block* output
     RETURN_IF_ERROR(
             VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, 
output_block->columns()));
 
+    *eos = _child_eos && _cur_child_offset == -1;
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/vtable_function_node.h 
b/be/src/vec/exec/vtable_function_node.h
index 451a35c739..c831e55856 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -30,10 +30,10 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
     Status prepare(RuntimeState* state) override;
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
-
-    bool need_more_input_data() { return !_child_block.rows(); }
+    bool need_more_input_data() { return !_child_block.rows() && !_child_eos; }
 
     Status push(RuntimeState*, vectorized::Block* input_block, bool eos) 
override {
+        _child_eos = eos;
         if (input_block->rows() == 0) {
             return Status::OK();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to