This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d1918093727d780809496b01c5aebfc528aa7b15 Author: zclllyybb <zhaochan...@selectdb.com> AuthorDate: Fri Jan 26 22:12:48 2024 +0800 [fix](pipeline) Fix non-prepared execute of UnionOperator (#30355) --- be/src/exec/exec_node.h | 11 +++++++++++ be/src/pipeline/exec/union_source_operator.cpp | 4 +++- be/src/vec/exec/vunion_node.cpp | 3 ++- be/src/vec/exec/vunion_node.h | 4 +++- be/src/vec/exprs/vcast_expr.cpp | 12 +++++------- 5 files changed, 24 insertions(+), 10 deletions(-) diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 5a6b04667e7..1dd8979f5b3 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -78,6 +78,17 @@ public: // If overridden in subclass, must first call superclass's prepare(). [[nodiscard]] virtual Status prepare(RuntimeState* state); + /* + * For open and alloc_resource: + * Base class ExecNode's `open` only calls `alloc_resource`, which opens some public projections. + * If was overrided, `open` must call corresponding `alloc_resource` since it's a (early) part of opening. + * Or just call `ExecNode::open` is alternative way. + * Then `alloc_resource` call father's after it's own business to make the progress completed, including the projections. + * In Pipeline engine: + * PipeContext::prepare -> node::prepare + * Task::open -> StreamingOp::open -> node::alloc_resource, for sink+source splits, only open in SinkOperator. + * So in pipeline, the things directly done by open(like call child's) wouldn't be done in `open`. + */ // Performs any preparatory work prior to calling get_next(). // Can be called repeatedly (after calls to close()). // Caller must not be holding any io buffers. This will cause deadlock. diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 8cfec9d3625..18b17b85a61 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -54,8 +54,10 @@ bool UnionSourceOperator::_has_data() { // we assumed it can read to process const expr, Although we don't know whether there is // ,and queue have data, could read also +// The source operator's run dependences on Node's alloc_resource, which is called in Sink's open. +// So hang until SinkOperator was scheduled to open. bool UnionSourceOperator::can_read() { - return _has_data() || _data_queue->is_all_finish(); + return _node->resource_allocated() && (_has_data() || _data_queue->is_all_finish()); } Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block* block, bool* eos) { diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index 08b6f8c5bbd..0dc6a408a4a 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -80,6 +80,7 @@ Status VUnionNode::prepare(RuntimeState* state) { SCOPED_TIMER(_exec_timer); _materialize_exprs_evaluate_timer = ADD_TIMER(_runtime_profile, "MaterializeExprsEvaluateTimer"); + // Prepare const expr lists. for (const VExprContextSPtrs& exprs : _const_expr_lists) { RETURN_IF_ERROR(VExpr::prepare(exprs, state, _row_descriptor)); @@ -93,7 +94,7 @@ Status VUnionNode::prepare(RuntimeState* state) { } Status VUnionNode::open(RuntimeState* state) { - RETURN_IF_ERROR(alloc_resource(state)); + RETURN_IF_ERROR(ExecNode::open(state)); // exactly same with this->alloc_resource() // Ensures that rows are available for clients to fetch after this open() has // succeeded. if (!_children.empty()) { diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h index ac63f9ca632..492cb9a98a8 100644 --- a/be/src/vec/exec/vunion_node.h +++ b/be/src/vec/exec/vunion_node.h @@ -18,8 +18,8 @@ #pragma once #include <glog/logging.h> -#include <stddef.h> +#include <cstddef> #include <iosfwd> #include <vector> @@ -64,6 +64,8 @@ public: /// GetNext() for the constant expression case. Status get_next_const(RuntimeState* state, Block* block); + bool resource_allocated() const { return _resource_allocated; } + private: /// Const exprs materialized by this node. These exprs don't refer to any children. /// Only materialized by the first fragment instance to avoid duplication. diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp index f322c1d2fae..56072967058 100644 --- a/be/src/vec/exprs/vcast_expr.cpp +++ b/be/src/vec/exprs/vcast_expr.cpp @@ -20,13 +20,10 @@ #include <fmt/format.h> #include <gen_cpp/Types_types.h> #include <glog/logging.h> -#include <stddef.h> -#include <algorithm> -#include <exception> +#include <cstddef> #include <memory> #include <ostream> -#include <vector> #include "common/exception.h" #include "common/status.h" @@ -87,8 +84,8 @@ const DataTypePtr& VCastExpr::get_target_type() const { doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { DCHECK(_prepare_finished); - for (int i = 0; i < _children.size(); ++i) { - RETURN_IF_ERROR(_children[i]->open(state, context, scope)); + for (auto& i : _children) { + RETURN_IF_ERROR(i->open(state, context, scope)); } RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); if (scope == FunctionContext::FRAGMENT_LOCAL) { @@ -105,7 +102,8 @@ void VCastExpr::close(VExprContext* context, FunctionContext::FunctionStateScope doris::Status VCastExpr::execute(VExprContext* context, doris::vectorized::Block* block, int* result_column_id) { - DCHECK(_open_finished || _getting_const_col); + DCHECK(_open_finished || _getting_const_col) + << _open_finished << _getting_const_col << _expr_name; // for each child call execute int column_id = 0; RETURN_IF_ERROR(_children[0]->execute(context, block, &column_id)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org