This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d1918093727d780809496b01c5aebfc528aa7b15
Author: zclllyybb <zhaochan...@selectdb.com>
AuthorDate: Fri Jan 26 22:12:48 2024 +0800

    [fix](pipeline) Fix non-prepared execute of UnionOperator (#30355)
---
 be/src/exec/exec_node.h                        | 11 +++++++++++
 be/src/pipeline/exec/union_source_operator.cpp |  4 +++-
 be/src/vec/exec/vunion_node.cpp                |  3 ++-
 be/src/vec/exec/vunion_node.h                  |  4 +++-
 be/src/vec/exprs/vcast_expr.cpp                | 12 +++++-------
 5 files changed, 24 insertions(+), 10 deletions(-)

diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 5a6b04667e7..1dd8979f5b3 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -78,6 +78,17 @@ public:
     // If overridden in subclass, must first call superclass's prepare().
     [[nodiscard]] virtual Status prepare(RuntimeState* state);
 
+    /*
+     * For open and alloc_resource:
+     *  Base class ExecNode's `open` only calls `alloc_resource`, which opens 
some public projections.
+     *  If was overrided, `open` must call corresponding `alloc_resource` 
since it's a (early) part of opening.
+     *  Or just call `ExecNode::open` is alternative way.
+     *  Then `alloc_resource` call father's after it's own business to make 
the progress completed, including the projections.
+     *  In Pipeline engine: 
+     *      PipeContext::prepare -> node::prepare
+     *      Task::open -> StreamingOp::open -> node::alloc_resource, for 
sink+source splits, only open in SinkOperator.
+     *  So in pipeline, the things directly done by open(like call child's) 
wouldn't be done in `open`.
+    */
     // Performs any preparatory work prior to calling get_next().
     // Can be called repeatedly (after calls to close()).
     // Caller must not be holding any io buffers. This will cause deadlock.
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 8cfec9d3625..18b17b85a61 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -54,8 +54,10 @@ bool UnionSourceOperator::_has_data() {
 
 // we assumed it can read to process const expr, Although we don't know 
whether there is
 // ,and queue have data, could read also
+// The source operator's run dependences on Node's alloc_resource, which is 
called in Sink's open.
+// So hang until SinkOperator was scheduled to open.
 bool UnionSourceOperator::can_read() {
-    return _has_data() || _data_queue->is_all_finish();
+    return _node->resource_allocated() && (_has_data() || 
_data_queue->is_all_finish());
 }
 
 Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index 08b6f8c5bbd..0dc6a408a4a 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -80,6 +80,7 @@ Status VUnionNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_exec_timer);
     _materialize_exprs_evaluate_timer =
             ADD_TIMER(_runtime_profile, "MaterializeExprsEvaluateTimer");
+
     // Prepare const expr lists.
     for (const VExprContextSPtrs& exprs : _const_expr_lists) {
         RETURN_IF_ERROR(VExpr::prepare(exprs, state, _row_descriptor));
@@ -93,7 +94,7 @@ Status VUnionNode::prepare(RuntimeState* state) {
 }
 
 Status VUnionNode::open(RuntimeState* state) {
-    RETURN_IF_ERROR(alloc_resource(state));
+    RETURN_IF_ERROR(ExecNode::open(state)); // exactly same with 
this->alloc_resource()
     // Ensures that rows are available for clients to fetch after this open() 
has
     // succeeded.
     if (!_children.empty()) {
diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h
index ac63f9ca632..492cb9a98a8 100644
--- a/be/src/vec/exec/vunion_node.h
+++ b/be/src/vec/exec/vunion_node.h
@@ -18,8 +18,8 @@
 #pragma once
 
 #include <glog/logging.h>
-#include <stddef.h>
 
+#include <cstddef>
 #include <iosfwd>
 #include <vector>
 
@@ -64,6 +64,8 @@ public:
     /// GetNext() for the constant expression case.
     Status get_next_const(RuntimeState* state, Block* block);
 
+    bool resource_allocated() const { return _resource_allocated; }
+
 private:
     /// Const exprs materialized by this node. These exprs don't refer to any 
children.
     /// Only materialized by the first fragment instance to avoid duplication.
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index f322c1d2fae..56072967058 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -20,13 +20,10 @@
 #include <fmt/format.h>
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
-#include <stddef.h>
 
-#include <algorithm>
-#include <exception>
+#include <cstddef>
 #include <memory>
 #include <ostream>
-#include <vector>
 
 #include "common/exception.h"
 #include "common/status.h"
@@ -87,8 +84,8 @@ const DataTypePtr& VCastExpr::get_target_type() const {
 doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* 
context,
                               FunctionContext::FunctionStateScope scope) {
     DCHECK(_prepare_finished);
-    for (int i = 0; i < _children.size(); ++i) {
-        RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+    for (auto& i : _children) {
+        RETURN_IF_ERROR(i->open(state, context, scope));
     }
     RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
     if (scope == FunctionContext::FRAGMENT_LOCAL) {
@@ -105,7 +102,8 @@ void VCastExpr::close(VExprContext* context, 
FunctionContext::FunctionStateScope
 
 doris::Status VCastExpr::execute(VExprContext* context, 
doris::vectorized::Block* block,
                                  int* result_column_id) {
-    DCHECK(_open_finished || _getting_const_col);
+    DCHECK(_open_finished || _getting_const_col)
+            << _open_finished << _getting_const_col << _expr_name;
     // for each child call execute
     int column_id = 0;
     RETURN_IF_ERROR(_children[0]->execute(context, block, &column_id));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to