This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 826e832500d [UT](pipeline) Add UT cases for task execution (#49866) 826e832500d is described below commit 826e832500da6edd0a2255bb8551c5ee5c1b7906 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Wed Apr 9 14:39:05 2025 +0800 [UT](pipeline) Add UT cases for task execution (#49866) --- be/src/common/status.h | 4 - be/src/pipeline/exec/operator.h | 38 +++++- be/src/pipeline/pipeline_task.cpp | 12 +- be/test/pipeline/pipeline_task_test.cpp | 215 +++++++++++++++++++++++++++++++- 4 files changed, 253 insertions(+), 16 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 841d5090699..5e1761a5e57 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -274,8 +274,6 @@ namespace ErrorCode { E(SEGCOMPACTION_INIT_READER, -3117, false); \ E(SEGCOMPACTION_INIT_WRITER, -3118, false); \ E(SEGCOMPACTION_FAILED, -3119, false); \ - E(PIP_WAIT_FOR_RF, -3120, false); \ - E(PIP_WAIT_FOR_SC, -3121, false); \ E(ROWSET_ADD_TO_BINLOG_FAILED, -3122, true); \ E(ROWSET_BINLOG_NOT_ONLY_ONE_VERSION, -3123, true); \ E(INVERTED_INDEX_INVALID_PARAMETERS, -6000, false); \ @@ -492,8 +490,6 @@ public: ERROR_CTOR(NotSupported, NOT_IMPLEMENTED_ERROR) ERROR_CTOR_NOSTACK(EndOfFile, END_OF_FILE) ERROR_CTOR(InternalError, INTERNAL_ERROR) - ERROR_CTOR_NOSTACK(WaitForRf, PIP_WAIT_FOR_RF) - ERROR_CTOR_NOSTACK(WaitForScannerContext, PIP_WAIT_FOR_SC) ERROR_CTOR(RuntimeError, RUNTIME_ERROR) ERROR_CTOR_NOSTACK(Cancelled, CANCELLED) ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 0db9ee635e9..3062d0fc623 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -1094,8 +1094,24 @@ public: ENABLE_FACTORY_CREATOR(DummyOperatorLocalState); DummyOperatorLocalState(RuntimeState* state, OperatorXBase* parent) - : PipelineXLocalState<FakeSharedState>(state, parent) {} + : PipelineXLocalState<FakeSharedState>(state, parent) { + _tmp_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "DummyOperatorDependency", true); + _finish_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "DummyOperatorDependency", true); + _filter_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "DummyOperatorDependency", true); + } + Dependency* finishdependency() override { return _finish_dependency.get(); } ~DummyOperatorLocalState() = default; + + std::vector<Dependency*> dependencies() const override { return {_tmp_dependency.get()}; } + std::vector<Dependency*> filter_dependencies() override { return {_filter_dependency.get()}; } + +private: + std::shared_ptr<Dependency> _tmp_dependency; + std::shared_ptr<Dependency> _finish_dependency; + std::shared_ptr<Dependency> _filter_dependency; }; class DummyOperator final : public OperatorX<DummyOperatorLocalState> { @@ -1108,17 +1124,31 @@ public: *eos = _eos; return Status::OK(); } + void set_low_memory_mode(RuntimeState* state) override { _low_memory_mode = true; } private: friend class AssertNumRowsLocalState; bool _eos = false; + bool _low_memory_mode = false; }; class DummySinkLocalState final : public PipelineXSinkLocalState<BasicSharedState> { public: using Base = PipelineXSinkLocalState<BasicSharedState>; ENABLE_FACTORY_CREATOR(DummySinkLocalState); - DummySinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {} + DummySinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) { + _tmp_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "DummyOperatorDependency", true); + _finish_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "DummyOperatorDependency", true); + } + + std::vector<Dependency*> dependencies() const override { return {_tmp_dependency.get()}; } + Dependency* finishdependency() override { return _finish_dependency.get(); } + +private: + std::shared_ptr<Dependency> _tmp_dependency; + std::shared_ptr<Dependency> _finish_dependency; }; class DummySinkOperatorX final : public DataSinkOperatorX<DummySinkLocalState> { @@ -1128,6 +1158,10 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { return Status::OK(); } + void set_low_memory_mode(RuntimeState* state) override { _low_memory_mode = true; } + +private: + bool _low_memory_mode = false; }; #endif diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 54a271ea553..ffe335a4189 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -226,11 +226,7 @@ Status PipelineTask::_open() { SCOPED_TIMER(_open_timer); _dry_run = _sink->should_dry_run(_state); for (auto& o : _operators) { - auto* local_state = _state->get_local_state(o->operator_id()); - auto st = local_state->open(_state); - DCHECK(st.is<ErrorCode::PIP_WAIT_FOR_RF>() ? !_filter_dependencies.empty() : true) - << debug_string(); - RETURN_IF_ERROR(st); + RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->open(_state)); } RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state)); RETURN_IF_ERROR(_extract_dependencies()); @@ -334,8 +330,10 @@ void PipelineTask::terminate() { * @return */ Status PipelineTask::execute(bool* done) { - DCHECK(_exec_state == State::RUNNABLE) << debug_string(); - DCHECK(_blocked_dep == nullptr) << debug_string(); + if (_exec_state != State::RUNNABLE || _blocked_dep != nullptr) [[unlikely]] { + return Status::InternalError("Pipeline task is not runnable! Task info: {}", + debug_string()); + } auto fragment_context = _fragment_context.lock(); DCHECK(fragment_context); int64_t time_spent = 0; diff --git a/be/test/pipeline/pipeline_task_test.cpp b/be/test/pipeline/pipeline_task_test.cpp index 456a36a6629..ef3ed753ff0 100644 --- a/be/test/pipeline/pipeline_task_test.cpp +++ b/be/test/pipeline/pipeline_task_test.cpp @@ -119,7 +119,7 @@ TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) { EXPECT_EQ(task->_exec_state, PipelineTask::State::INITED); } -TEST_F(PipelineTaskTest, TEST_PREPARE_HAPPY_PATH) { +TEST_F(PipelineTaskTest, TEST_PREPARE) { auto num_instances = 1; auto pip_id = 0; auto task_id = 0; @@ -145,7 +145,6 @@ TEST_F(PipelineTaskTest, TEST_PREPARE_HAPPY_PATH) { auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context, profile.get(), shared_state_map, task_id); { - // HAPPY PATH std::vector<TScanRangeParams> scan_range; int sender_id = 0; TDataSink tsink; @@ -154,7 +153,7 @@ TEST_F(PipelineTaskTest, TEST_PREPARE_HAPPY_PATH) { } } -TEST_F(PipelineTaskTest, TEST_PREPARE) { +TEST_F(PipelineTaskTest, TEST_PREPARE_ERROR) { auto num_instances = 1; auto pip_id = 0; auto task_id = 0; @@ -189,4 +188,214 @@ TEST_F(PipelineTaskTest, TEST_PREPARE) { } } +TEST_F(PipelineTaskTest, TEST_EXTRACT_DEPENDENCIES_ERROR) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances); + { + OperatorPtr source_op; + // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + DataSinkOperatorPtr sink_op; + sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + } + auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id)); + std::map<int, + std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; + auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + { + EXPECT_FALSE(task->_extract_dependencies().ok()); + EXPECT_TRUE(task->_read_dependencies.empty()); + EXPECT_TRUE(task->_write_dependencies.empty()); + EXPECT_TRUE(task->_finish_dependencies.empty()); + EXPECT_TRUE(task->_spill_dependencies.empty()); + } +} + +TEST_F(PipelineTaskTest, TEST_OPEN) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances); + { + OperatorPtr source_op; + // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + DataSinkOperatorPtr sink_op; + sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + } + auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id)); + std::map<int, + std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-1); + auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + { + std::vector<TScanRangeParams> scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); + } + { + EXPECT_TRUE(task->_open().ok()); + EXPECT_FALSE(task->_read_dependencies.empty()); + EXPECT_FALSE(task->_write_dependencies.empty()); + EXPECT_FALSE(task->_finish_dependencies.empty()); + EXPECT_TRUE(task->_spill_dependencies.empty()); + EXPECT_TRUE(task->_opened); + } +} + +TEST_F(PipelineTaskTest, TEST_EXECUTE) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances); + Dependency* read_dep; + Dependency* write_dep; + Dependency* source_finish_dep; + { + OperatorPtr source_op; + // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + DataSinkOperatorPtr sink_op; + sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + } + auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id)); + std::map<int, + std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-1); + auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + task->set_task_queue(_task_queue.get()); + { + // `execute` should be called after `prepare` + bool done = false; + EXPECT_FALSE(task->execute(&done).ok()); + } + { + std::vector<TScanRangeParams> scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); + EXPECT_FALSE(task->_filter_dependencies.empty()); + read_dep = _runtime_state->get_local_state_result(task->_operators.front()->operator_id()) + .value() + ->dependencies() + .front(); + write_dep = _runtime_state->get_sink_local_state()->dependencies().front(); + } + { + // task is blocked by execution dependency. + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_FALSE(task->_eos); + EXPECT_FALSE(done); + EXPECT_FALSE(task->_wake_up_early); + EXPECT_FALSE(task->_opened); + EXPECT_FALSE(_query_ctx->get_execution_dependency()->ready()); + EXPECT_FALSE(_query_ctx->get_execution_dependency()->_blocked_task.empty()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED); + } + { + // task is blocked by filter dependency. + _query_ctx->get_execution_dependency()->set_ready(); + task->_filter_dependencies.front()->block(); + EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_FALSE(task->_eos); + EXPECT_FALSE(done); + EXPECT_FALSE(task->_wake_up_early); + EXPECT_FALSE(task->_opened); + EXPECT_FALSE(task->_filter_dependencies.front()->ready()); + EXPECT_FALSE(task->_filter_dependencies.front()->_blocked_task.empty()); + EXPECT_TRUE(task->_read_dependencies.empty()); + EXPECT_TRUE(task->_write_dependencies.empty()); + EXPECT_TRUE(task->_finish_dependencies.empty()); + EXPECT_TRUE(task->_spill_dependencies.empty()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED); + } + { + // `open` phase. And then task is blocked by read dependency. + task->_filter_dependencies.front()->set_ready(); + read_dep->block(); + EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_FALSE(task->_eos); + EXPECT_FALSE(done); + EXPECT_FALSE(task->_wake_up_early); + EXPECT_FALSE(task->_read_dependencies.empty()); + EXPECT_FALSE(task->_write_dependencies.empty()); + EXPECT_FALSE(task->_finish_dependencies.empty()); + EXPECT_TRUE(task->_spill_dependencies.empty()); + EXPECT_TRUE(task->_opened); + EXPECT_FALSE(read_dep->ready()); + EXPECT_TRUE(write_dep->ready()); + EXPECT_FALSE(read_dep->_blocked_task.empty()); + source_finish_dep = + _runtime_state->get_local_state_result(task->_operators.front()->operator_id()) + .value() + ->finishdependency(); + EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED); + } + { + // `execute` phase. And then task is blocked by finish dependency. + read_dep->set_ready(); + source_finish_dep->block(); + EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); + task->_operators.front()->cast<DummyOperator>()._eos = true; + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_TRUE(task->_eos); + EXPECT_FALSE(done); + EXPECT_FALSE(task->_wake_up_early); + EXPECT_FALSE(source_finish_dep->ready()); + EXPECT_FALSE(source_finish_dep->_blocked_task.empty()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED); + } + { + // `execute` phase. + source_finish_dep->set_ready(); + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_TRUE(task->_eos); + EXPECT_TRUE(done); + EXPECT_FALSE(task->_wake_up_early); + EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE); + } + { + EXPECT_TRUE(task->close(Status::OK()).ok()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED); + EXPECT_TRUE(task->finalize().ok()); + EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED); + } +} + } // namespace doris::pipeline --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org