This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 826e832500d [UT](pipeline) Add UT cases for task execution (#49866)
826e832500d is described below

commit 826e832500da6edd0a2255bb8551c5ee5c1b7906
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Wed Apr 9 14:39:05 2025 +0800

    [UT](pipeline) Add UT cases for task execution (#49866)
---
 be/src/common/status.h                  |   4 -
 be/src/pipeline/exec/operator.h         |  38 +++++-
 be/src/pipeline/pipeline_task.cpp       |  12 +-
 be/test/pipeline/pipeline_task_test.cpp | 215 +++++++++++++++++++++++++++++++-
 4 files changed, 253 insertions(+), 16 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index 841d5090699..5e1761a5e57 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -274,8 +274,6 @@ namespace ErrorCode {
     E(SEGCOMPACTION_INIT_READER, -3117, false);              \
     E(SEGCOMPACTION_INIT_WRITER, -3118, false);              \
     E(SEGCOMPACTION_FAILED, -3119, false);                   \
-    E(PIP_WAIT_FOR_RF, -3120, false);                        \
-    E(PIP_WAIT_FOR_SC, -3121, false);                        \
     E(ROWSET_ADD_TO_BINLOG_FAILED, -3122, true);             \
     E(ROWSET_BINLOG_NOT_ONLY_ONE_VERSION, -3123, true);      \
     E(INVERTED_INDEX_INVALID_PARAMETERS, -6000, false);      \
@@ -492,8 +490,6 @@ public:
     ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR)
     ERROR_CTOR_NOSTACK(EndOfFile, END_OF_FILE)
     ERROR_CTOR(InternalError, INTERNAL_ERROR)
-    ERROR_CTOR_NOSTACK(WaitForRf, PIP_WAIT_FOR_RF)
-    ERROR_CTOR_NOSTACK(WaitForScannerContext, PIP_WAIT_FOR_SC)
     ERROR_CTOR(RuntimeError, RUNTIME_ERROR)
     ERROR_CTOR_NOSTACK(Cancelled, CANCELLED)
     ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 0db9ee635e9..3062d0fc623 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -1094,8 +1094,24 @@ public:
     ENABLE_FACTORY_CREATOR(DummyOperatorLocalState);
 
     DummyOperatorLocalState(RuntimeState* state, OperatorXBase* parent)
-            : PipelineXLocalState<FakeSharedState>(state, parent) {}
+            : PipelineXLocalState<FakeSharedState>(state, parent) {
+        _tmp_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                    "DummyOperatorDependency", 
true);
+        _finish_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                       
"DummyOperatorDependency", true);
+        _filter_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                       
"DummyOperatorDependency", true);
+    }
+    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
     ~DummyOperatorLocalState() = default;
+
+    std::vector<Dependency*> dependencies() const override { return 
{_tmp_dependency.get()}; }
+    std::vector<Dependency*> filter_dependencies() override { return 
{_filter_dependency.get()}; }
+
+private:
+    std::shared_ptr<Dependency> _tmp_dependency;
+    std::shared_ptr<Dependency> _finish_dependency;
+    std::shared_ptr<Dependency> _filter_dependency;
 };
 
 class DummyOperator final : public OperatorX<DummyOperatorLocalState> {
@@ -1108,17 +1124,31 @@ public:
         *eos = _eos;
         return Status::OK();
     }
+    void set_low_memory_mode(RuntimeState* state) override { _low_memory_mode 
= true; }
 
 private:
     friend class AssertNumRowsLocalState;
     bool _eos = false;
+    bool _low_memory_mode = false;
 };
 
 class DummySinkLocalState final : public 
PipelineXSinkLocalState<BasicSharedState> {
 public:
     using Base = PipelineXSinkLocalState<BasicSharedState>;
     ENABLE_FACTORY_CREATOR(DummySinkLocalState);
-    DummySinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : 
Base(parent, state) {}
+    DummySinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : 
Base(parent, state) {
+        _tmp_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                    "DummyOperatorDependency", 
true);
+        _finish_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                       
"DummyOperatorDependency", true);
+    }
+
+    std::vector<Dependency*> dependencies() const override { return 
{_tmp_dependency.get()}; }
+    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
+
+private:
+    std::shared_ptr<Dependency> _tmp_dependency;
+    std::shared_ptr<Dependency> _finish_dependency;
 };
 
 class DummySinkOperatorX final : public DataSinkOperatorX<DummySinkLocalState> 
{
@@ -1128,6 +1158,10 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override {
         return Status::OK();
     }
+    void set_low_memory_mode(RuntimeState* state) override { _low_memory_mode 
= true; }
+
+private:
+    bool _low_memory_mode = false;
 };
 #endif
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 54a271ea553..ffe335a4189 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -226,11 +226,7 @@ Status PipelineTask::_open() {
     SCOPED_TIMER(_open_timer);
     _dry_run = _sink->should_dry_run(_state);
     for (auto& o : _operators) {
-        auto* local_state = _state->get_local_state(o->operator_id());
-        auto st = local_state->open(_state);
-        DCHECK(st.is<ErrorCode::PIP_WAIT_FOR_RF>() ? 
!_filter_dependencies.empty() : true)
-                << debug_string();
-        RETURN_IF_ERROR(st);
+        
RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->open(_state));
     }
     RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
     RETURN_IF_ERROR(_extract_dependencies());
@@ -334,8 +330,10 @@ void PipelineTask::terminate() {
  * @return
  */
 Status PipelineTask::execute(bool* done) {
-    DCHECK(_exec_state == State::RUNNABLE) << debug_string();
-    DCHECK(_blocked_dep == nullptr) << debug_string();
+    if (_exec_state != State::RUNNABLE || _blocked_dep != nullptr) 
[[unlikely]] {
+        return Status::InternalError("Pipeline task is not runnable! Task 
info: {}",
+                                     debug_string());
+    }
     auto fragment_context = _fragment_context.lock();
     DCHECK(fragment_context);
     int64_t time_spent = 0;
diff --git a/be/test/pipeline/pipeline_task_test.cpp 
b/be/test/pipeline/pipeline_task_test.cpp
index 456a36a6629..ef3ed753ff0 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -119,7 +119,7 @@ TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
     EXPECT_EQ(task->_exec_state, PipelineTask::State::INITED);
 }
 
-TEST_F(PipelineTaskTest, TEST_PREPARE_HAPPY_PATH) {
+TEST_F(PipelineTaskTest, TEST_PREPARE) {
     auto num_instances = 1;
     auto pip_id = 0;
     auto task_id = 0;
@@ -145,7 +145,6 @@ TEST_F(PipelineTaskTest, TEST_PREPARE_HAPPY_PATH) {
     auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
                                                profile.get(), 
shared_state_map, task_id);
     {
-        // HAPPY PATH
         std::vector<TScanRangeParams> scan_range;
         int sender_id = 0;
         TDataSink tsink;
@@ -154,7 +153,7 @@ TEST_F(PipelineTaskTest, TEST_PREPARE_HAPPY_PATH) {
     }
 }
 
-TEST_F(PipelineTaskTest, TEST_PREPARE) {
+TEST_F(PipelineTaskTest, TEST_PREPARE_ERROR) {
     auto num_instances = 1;
     auto pip_id = 0;
     auto task_id = 0;
@@ -189,4 +188,214 @@ TEST_F(PipelineTaskTest, TEST_PREPARE) {
     }
 }
 
+TEST_F(PipelineTaskTest, TEST_EXTRACT_DEPENDENCIES_ERROR) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    {
+        OperatorPtr source_op;
+        // 1. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
+        source_op.reset(new DummyOperator());
+        EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+        int op_id = 1;
+        int node_id = 2;
+        int dest_id = 3;
+        DataSinkOperatorPtr sink_op;
+        sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+        EXPECT_TRUE(pip->set_sink(sink_op).ok());
+    }
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    {
+        EXPECT_FALSE(task->_extract_dependencies().ok());
+        EXPECT_TRUE(task->_read_dependencies.empty());
+        EXPECT_TRUE(task->_write_dependencies.empty());
+        EXPECT_TRUE(task->_finish_dependencies.empty());
+        EXPECT_TRUE(task->_spill_dependencies.empty());
+    }
+}
+
+TEST_F(PipelineTaskTest, TEST_OPEN) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    {
+        OperatorPtr source_op;
+        // 1. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
+        source_op.reset(new DummyOperator());
+        EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+        int op_id = 1;
+        int node_id = 2;
+        int dest_id = 3;
+        DataSinkOperatorPtr sink_op;
+        sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+        EXPECT_TRUE(pip->set_sink(sink_op).ok());
+    }
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    {
+        std::vector<TScanRangeParams> scan_range;
+        int sender_id = 0;
+        TDataSink tsink;
+        EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+    }
+    {
+        EXPECT_TRUE(task->_open().ok());
+        EXPECT_FALSE(task->_read_dependencies.empty());
+        EXPECT_FALSE(task->_write_dependencies.empty());
+        EXPECT_FALSE(task->_finish_dependencies.empty());
+        EXPECT_TRUE(task->_spill_dependencies.empty());
+        EXPECT_TRUE(task->_opened);
+    }
+}
+
+TEST_F(PipelineTaskTest, TEST_EXECUTE) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    Dependency* read_dep;
+    Dependency* write_dep;
+    Dependency* source_finish_dep;
+    {
+        OperatorPtr source_op;
+        // 1. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
+        source_op.reset(new DummyOperator());
+        EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+        int op_id = 1;
+        int node_id = 2;
+        int dest_id = 3;
+        DataSinkOperatorPtr sink_op;
+        sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+        EXPECT_TRUE(pip->set_sink(sink_op).ok());
+    }
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    task->set_task_queue(_task_queue.get());
+    {
+        // `execute` should be called after `prepare`
+        bool done = false;
+        EXPECT_FALSE(task->execute(&done).ok());
+    }
+    {
+        std::vector<TScanRangeParams> scan_range;
+        int sender_id = 0;
+        TDataSink tsink;
+        EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        EXPECT_FALSE(task->_filter_dependencies.empty());
+        read_dep = 
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
+                           .value()
+                           ->dependencies()
+                           .front();
+        write_dep = 
_runtime_state->get_sink_local_state()->dependencies().front();
+    }
+    {
+        // task is blocked by execution dependency.
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_FALSE(task->_eos);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+        EXPECT_FALSE(task->_opened);
+        EXPECT_FALSE(_query_ctx->get_execution_dependency()->ready());
+        
EXPECT_FALSE(_query_ctx->get_execution_dependency()->_blocked_task.empty());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+    }
+    {
+        // task is blocked by filter dependency.
+        _query_ctx->get_execution_dependency()->set_ready();
+        task->_filter_dependencies.front()->block();
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_FALSE(task->_eos);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+        EXPECT_FALSE(task->_opened);
+        EXPECT_FALSE(task->_filter_dependencies.front()->ready());
+        
EXPECT_FALSE(task->_filter_dependencies.front()->_blocked_task.empty());
+        EXPECT_TRUE(task->_read_dependencies.empty());
+        EXPECT_TRUE(task->_write_dependencies.empty());
+        EXPECT_TRUE(task->_finish_dependencies.empty());
+        EXPECT_TRUE(task->_spill_dependencies.empty());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+    }
+    {
+        // `open` phase. And then task is blocked by read dependency.
+        task->_filter_dependencies.front()->set_ready();
+        read_dep->block();
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_FALSE(task->_eos);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+        EXPECT_FALSE(task->_read_dependencies.empty());
+        EXPECT_FALSE(task->_write_dependencies.empty());
+        EXPECT_FALSE(task->_finish_dependencies.empty());
+        EXPECT_TRUE(task->_spill_dependencies.empty());
+        EXPECT_TRUE(task->_opened);
+        EXPECT_FALSE(read_dep->ready());
+        EXPECT_TRUE(write_dep->ready());
+        EXPECT_FALSE(read_dep->_blocked_task.empty());
+        source_finish_dep =
+                
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
+                        .value()
+                        ->finishdependency();
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+    }
+    {
+        // `execute` phase. And then task is blocked by finish dependency.
+        read_dep->set_ready();
+        source_finish_dep->block();
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        task->_operators.front()->cast<DummyOperator>()._eos = true;
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_TRUE(task->_eos);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+        EXPECT_FALSE(source_finish_dep->ready());
+        EXPECT_FALSE(source_finish_dep->_blocked_task.empty());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+    }
+    {
+        // `execute` phase.
+        source_finish_dep->set_ready();
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_TRUE(task->_eos);
+        EXPECT_TRUE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+    }
+    {
+        EXPECT_TRUE(task->close(Status::OK()).ok());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+        EXPECT_TRUE(task->finalize().ok());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
+    }
+}
+
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to