This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fdc063101f9 [pipeline](refactor) remove pipeline task state (#34527)
fdc063101f9 is described below

commit fdc063101f92de82586b4b1e8cbda6a4c4f0b5cb
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri May 10 09:30:39 2024 +0800

    [pipeline](refactor) remove pipeline task state (#34527)
---
 be/src/pipeline/dependency.cpp     |  9 ------
 be/src/pipeline/dependency.h       | 18 -----------
 be/src/pipeline/pipeline_task.cpp  | 43 +++++++++----------------
 be/src/pipeline/pipeline_task.h    | 64 +++-----------------------------------
 be/src/pipeline/pipeline_tracing.h |  5 ++-
 be/src/pipeline/task_scheduler.cpp | 53 +++++++------------------------
 6 files changed, 33 insertions(+), 159 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index de8d3e76c6f..2508040ea3f 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -80,15 +80,6 @@ Dependency* Dependency::is_blocked_by(PipelineTask* task) {
     return ready ? nullptr : this;
 }
 
-Dependency* FinishDependency::is_blocked_by(PipelineTask* task) {
-    std::unique_lock<std::mutex> lc(_task_lock);
-    auto ready = _ready.load();
-    if (!ready && task) {
-        _add_block_task(task);
-    }
-    return ready ? nullptr : this;
-}
-
 std::string Dependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 5a67881c23d..891cc52a712 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -189,24 +189,6 @@ struct FakeSharedState final : public BasicSharedState {
     ENABLE_FACTORY_CREATOR(FakeSharedState)
 };
 
-struct FakeDependency final : public Dependency {
-public:
-    using SharedState = FakeSharedState;
-    FakeDependency(int id, int node_id, QueryContext* query_ctx)
-            : Dependency(id, node_id, "FakeDependency", query_ctx) {}
-
-    [[nodiscard]] Dependency* is_blocked_by(PipelineTask* task) override { 
return nullptr; }
-};
-
-struct FinishDependency : public Dependency {
-public:
-    using SharedState = FakeSharedState;
-    FinishDependency(int id, int node_id, std::string name, QueryContext* 
query_ctx)
-            : Dependency(id, node_id, name, true, query_ctx) {}
-
-    [[nodiscard]] Dependency* is_blocked_by(PipelineTask* task) override;
-};
-
 struct CountedFinishDependency final : public Dependency {
 public:
     using SharedState = FakeSharedState;
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index b0925ad875c..0ea82e305fd 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -57,7 +57,6 @@ PipelineTask::PipelineTask(
           _prepared(false),
           _opened(false),
           _state(state),
-          _cur_state(PipelineTaskState::NOT_READY),
           _fragment_context(fragment_context),
           _parent_profile(parent_profile),
           _operators(pipeline->operator_xs()),
@@ -79,7 +78,6 @@ PipelineTask::PipelineTask(
 Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, 
const TDataSink& tsink,
                              QueryContext* query_ctx) {
     DCHECK(_sink);
-    DCHECK(_cur_state == PipelineTaskState::NOT_READY) << 
get_state_name(_cur_state);
     _init_profile();
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_CPU_TIMER(_task_cpu_timer);
@@ -116,8 +114,6 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& 
local_params, const
         std::copy(deps.begin(), deps.end(),
                   std::inserter(_filter_dependencies, 
_filter_dependencies.end()));
     }
-    // We should make sure initial state for task are runnable so that we can 
do some preparation jobs (e.g. initialize runtime filters).
-    set_state(PipelineTaskState::RUNNABLE);
     _prepared = true;
     return Status::OK();
 }
@@ -205,6 +201,11 @@ Status PipelineTask::execute(bool* eos) {
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_TIMER(_exec_timer);
     SCOPED_ATTACH_TASK(_state);
+    *eos = _eos;
+    if (_eos) {
+        // If task is waken up by finish dependency, `_eos` is set to true by 
last execution, and we should return here.
+        return Status::OK();
+    }
     int64_t time_spent = 0;
 
     ThreadCpuStopWatch cpu_time_stop_watch;
@@ -220,9 +221,7 @@ Status PipelineTask::execute(bool* eos) {
             cpu_qs->add_cpu_nanos(delta_cpu_time);
         }
     }};
-    *eos = false;
     if (has_dependency() || _runtime_filter_blocked_dependency() != nullptr) {
-        set_state(PipelineTaskState::BLOCKED);
         return Status::OK();
     }
     // The status must be runnable
@@ -232,16 +231,13 @@ Status PipelineTask::execute(bool* eos) {
             RETURN_IF_ERROR(_open());
         }
         if (!source_can_read() || !sink_can_write()) {
-            set_state(PipelineTaskState::BLOCKED);
             return Status::OK();
         }
     }
 
-    Status status = Status::OK();
     while (!_fragment_context->is_canceled()) {
         if ((_root->need_data_from_children(_state) && !source_can_read()) || 
!sink_can_write()) {
-            set_state(PipelineTaskState::BLOCKED);
-            break;
+            return Status::OK();
         }
 
         /// When a task is cancelled,
@@ -265,6 +261,7 @@ Status PipelineTask::execute(bool* eos) {
             continue;
         }
 
+        *eos = _eos;
         // Pull block from operator chain
         if (!_dry_run) {
             SCOPED_TIMER(_get_block_timer);
@@ -277,22 +274,26 @@ Status PipelineTask::execute(bool* eos) {
             }
         } else {
             *eos = true;
+            _eos = true;
         }
 
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
+            Status status = Status::OK();
             status = _sink->sink(_state, block, *eos);
             if (!status.is<ErrorCode::END_OF_FILE>()) {
                 RETURN_IF_ERROR(status);
             }
             *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
             if (*eos) { // just return, the scheduler will do finish work
-                break;
+                _eos = true;
+                return Status::OK();
             }
         }
     }
 
-    return status;
+    static_cast<void>(get_task_queue()->push_back(this));
+    return Status::OK();
 }
 
 bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes) {
@@ -349,20 +350,6 @@ void PipelineTask::finalize() {
     _le_state_map.clear();
 }
 
-// The FSM see PipelineTaskState's comment
-void PipelineTask::set_state(PipelineTaskState state) {
-    DCHECK(_cur_state != PipelineTaskState::FINISHED);
-
-    if (_cur_state == state) {
-        return;
-    }
-    if (_cur_state == PipelineTaskState::RUNNABLE && state != 
PipelineTaskState::RUNNABLE) {
-        COUNTER_UPDATE(_block_counts, 1);
-    }
-
-    _cur_state = state;
-}
-
 Status PipelineTask::close(Status exec_status) {
     int64_t close_ns = 0;
     Defer defer {[&]() {
@@ -404,9 +391,9 @@ std::string PipelineTask::debug_string() {
     auto elapsed = (MonotonicNanos() - _fragment_context->create_time()) / 
1000000000.0;
     auto* cur_blocked_dep = _blocked_dep;
     fmt::format_to(debug_string_buffer,
-                   "PipelineTask[this = {}, state = {}, dry run = {}, elapse 
time "
+                   "PipelineTask[this = {}, dry run = {}, elapse time "
                    "= {}s], block dependency = {}, is running = {}\noperators: 
",
-                   (void*)this, get_state_name(_cur_state), _dry_run, elapsed,
+                   (void*)this, _dry_run, elapsed,
                    cur_blocked_dep && !_finished ? 
cur_blocked_dep->debug_string() : "NULL",
                    is_running());
     for (size_t i = 0; i < _operators.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index ddf6b190eb7..4ca3abbc4c5 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -42,60 +42,6 @@ class PipelineFragmentContext;
 
 namespace doris::pipeline {
 
-/**
- * PipelineTaskState indicates all possible states of a pipeline task.
- * A FSM is described as below:
- *
- *                 |-----------------------------------------------------|
- *                 |---|                  transfer 2    transfer 3       |   
transfer 4
- *                     |-------> BLOCKED ------------|                   
|---------------------------------------> CANCELED
- *              |------|                             |                   | 
transfer 5           transfer 6|
- * NOT_READY ---| transfer 0                         |-----> RUNNABLE 
---|---------> PENDING_FINISH ------|
- *              |                                    |          ^        |     
                 transfer 7|
- *              |------------------------------------|          
|--------|---------------------------------------> FINISHED
- *                transfer 1                                   transfer 9      
    transfer 8
- * BLOCKED include BLOCKED_FOR_DEPENDENCY, BLOCKED_FOR_SOURCE and 
BLOCKED_FOR_SINK.
- *
- * transfer 0 (NOT_READY -> BLOCKED): this pipeline task has some incomplete 
dependencies
- * transfer 1 (NOT_READY -> RUNNABLE): this pipeline task has no incomplete 
dependencies
- * transfer 2 (BLOCKED -> RUNNABLE): runnable condition for this pipeline task 
is met (e.g. get a new block from rpc)
- * transfer 3 (RUNNABLE -> BLOCKED): runnable condition for this pipeline task 
is not met (e.g. sink operator send a block by RPC and wait for a response)
- * transfer 4 (RUNNABLE -> CANCELED): current fragment is cancelled
- * transfer 5 (RUNNABLE -> PENDING_FINISH): this pipeline task completed but 
wait for releasing resources hold by itself
- * transfer 6 (PENDING_FINISH -> CANCELED): current fragment is cancelled
- * transfer 7 (PENDING_FINISH -> FINISHED): this pipeline task completed and 
resources hold by itself have been released already
- * transfer 8 (RUNNABLE -> FINISHED): this pipeline task completed and no 
resource need to be released
- * transfer 9 (RUNNABLE -> RUNNABLE): this pipeline task yields CPU and 
re-enters the runnable queue if it is runnable and has occupied CPU for a max 
time slice
- */
-enum class PipelineTaskState : uint8_t {
-    NOT_READY = 0, // do not prepare
-    BLOCKED = 1,   // blocked by dependency
-    RUNNABLE = 2,  // can execute
-    PENDING_FINISH =
-            3,    // compute task is over, but still hold resource. like some 
scan and sink task
-    FINISHED = 4, // finish with a regular state
-    CANCELED = 5, // being cancelled
-
-};
-
-inline const char* get_state_name(PipelineTaskState idx) {
-    switch (idx) {
-    case PipelineTaskState::NOT_READY:
-        return "NOT_READY";
-    case PipelineTaskState::BLOCKED:
-        return "BLOCKED";
-    case PipelineTaskState::RUNNABLE:
-        return "RUNNABLE";
-    case PipelineTaskState::PENDING_FINISH:
-        return "PENDING_FINISH";
-    case PipelineTaskState::FINISHED:
-        return "FINISHED";
-    case PipelineTaskState::CANCELED:
-        return "CANCELED";
-    }
-    __builtin_unreachable();
-}
-
 class TaskQueue;
 class PriorityTaskQueue;
 class Dependency;
@@ -195,7 +141,7 @@ public:
     int task_id() const { return _index; };
 
     void clear_blocking_state() {
-        if (!_finished && get_state() != PipelineTaskState::PENDING_FINISH && 
_blocked_dep) {
+        if (!_finished && _blocked_dep) {
             _blocked_dep->set_ready();
             _blocked_dep = nullptr;
         }
@@ -237,8 +183,6 @@ public:
     }
 
     void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
-    PipelineTaskState get_state() const { return _cur_state; }
-    void set_state(PipelineTaskState state);
 
     bool is_running() { return _running.load(); }
     void set_running(bool running) { _running = running; }
@@ -264,8 +208,8 @@ public:
             LOG(INFO) << "query id|instanceid " << 
print_id(_state->query_id()) << "|"
                       << print_id(_state->fragment_instance_id())
                       << " current pipeline exceed run time "
-                      << config::enable_debug_log_timeout_secs << " seconds. 
Task state "
-                      << get_state_name(get_state()) << "/n task detail:" << 
debug_string();
+                      << config::enable_debug_log_timeout_secs << " seconds. "
+                      << "/n task detail:" << debug_string();
         }
     }
 
@@ -332,7 +276,6 @@ private:
     RuntimeState* _state = nullptr;
     int _previous_schedule_id = -1;
     uint32_t _schedule_time = 0;
-    PipelineTaskState _cur_state;
     std::unique_ptr<doris::vectorized::Block> _block;
     PipelineFragmentContext* _fragment_context = nullptr;
     TaskQueue* _task_queue = nullptr;
@@ -396,6 +339,7 @@ private:
     std::mutex _release_lock;
 
     std::atomic<bool> _running {false};
+    std::atomic<bool> _eos {false};
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_tracing.h 
b/be/src/pipeline/pipeline_tracing.h
index aad0a7f9ee8..5d446e6f3d3 100644
--- a/be/src/pipeline/pipeline_tracing.h
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -39,12 +39,11 @@ struct ScheduleRecord {
     uint64_t thread_id;
     uint64_t start_time;
     uint64_t end_time;
-    std::string_view state_name;
 
     bool operator<(const ScheduleRecord& rhs) const { return start_time < 
rhs.start_time; }
     std::string to_string(uint64_t append_value) const {
-        return fmt::format("{}|{}|{}|{}|{}|{}|{}|{}\n", 
doris::to_string(query_id), task_id,
-                           core_id, thread_id, start_time, end_time, 
state_name, append_value);
+        return fmt::format("{}|{}|{}|{}|{}|{}|{}\n", 
doris::to_string(query_id), task_id, core_id,
+                           thread_id, start_time, end_time, append_value);
     }
 };
 
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index fbb67afdf46..5dc8982e426 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -72,7 +72,7 @@ Status TaskScheduler::schedule_task(PipelineTask* task) {
 }
 
 // after _close_task, task maybe destructed.
-void _close_task(PipelineTask* task, PipelineTaskState state, Status 
exec_status) {
+void _close_task(PipelineTask* task, Status exec_status) {
     // Has to attach memory tracker here, because the close task will also 
release some memory.
     // Should count the memory to the query or the query's memory will not 
decrease when part of
     // task finished.
@@ -86,12 +86,10 @@ void _close_task(PipelineTask* task, PipelineTaskState 
state, Status exec_status
     // We have already refactor all source and sink api, the close API does 
not need waiting
     // for pending finish now. So that could call close directly.
     Status status = task->close(exec_status);
-    if (!status.ok() && state != PipelineTaskState::CANCELED) {
+    if (!status.ok()) {
         
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
                                          std::string(status.msg()));
-        state = PipelineTaskState::CANCELED;
     }
-    task->set_state(state);
     task->finalize();
     task->set_running(false);
     task->fragment_context()->close_a_pipeline();
@@ -113,25 +111,18 @@ void TaskScheduler::_do_work(size_t index) {
         auto* fragment_ctx = task->fragment_context();
         bool canceled = fragment_ctx->is_canceled();
 
-        auto state = task->get_state();
         // If the state is PENDING_FINISH, then the task is come from blocked 
queue, its is_pending_finish
         // has to return false. The task is finished and need to close now.
-        if (state == PipelineTaskState::PENDING_FINISH || canceled) {
+        if (canceled) {
             // may change from pending FINISH,should called cancel
             // also may change form BLOCK, other task called cancel
 
             // If pipeline is canceled, it will report after pipeline closed, 
and will propagate
             // errors to downstream through exchange. So, here we needn't 
send_report.
             // fragment_ctx->send_report(true);
-            Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
-            _close_task(task, canceled ? PipelineTaskState::CANCELED : 
PipelineTaskState::FINISHED,
-                        exec_status);
+            _close_task(task, fragment_ctx->get_query_ctx()->exec_status());
             continue;
         }
-        DCHECK(state != PipelineTaskState::FINISHED && state != 
PipelineTaskState::CANCELED)
-                << "task already finish: " << task->debug_string();
-
-        task->set_state(PipelineTaskState::RUNNABLE);
 
         // task exec
         bool eos = false;
@@ -158,10 +149,8 @@ void TaskScheduler::_do_work(size_t index) {
                 status = task->execute(&eos);
 
                 uint64_t end_time = MonotonicMicros();
-                std::string_view state_name = 
get_state_name(task->get_state());
                 ExecEnv::GetInstance()->pipeline_tracer_context()->record(
-                        {query_id, task_name, core_id, thread_id, start_time, 
end_time,
-                         state_name});
+                        {query_id, task_name, core_id, thread_id, start_time, 
end_time});
             } else {
                 status = task->execute(&eos);
             }
@@ -171,14 +160,7 @@ void TaskScheduler::_do_work(size_t index) {
 
         task->set_previous_core_id(index);
 
-        if (status.is<ErrorCode::END_OF_FILE>()) {
-            // Sink operator finished, just close task now.
-            _close_task(task, PipelineTaskState::FINISHED, Status::OK());
-            continue;
-        } else if (!status.ok()) {
-            LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} 
reason: {}",
-                                        
print_id(task->query_context()->query_id()),
-                                        status.to_string());
+        if (!status.ok()) {
             // Print detail informations below when you debugging here.
             //
             // LOG(WARNING)<< "task:\n"<<task->debug_string();
@@ -186,7 +168,10 @@ void TaskScheduler::_do_work(size_t index) {
             // exec failed,cancel all fragment instance
             fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
                                  std::string(status.to_string_no_stack()));
-            _close_task(task, PipelineTaskState::CANCELED, status);
+            LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} 
reason: {}",
+                                        
print_id(task->query_context()->query_id()),
+                                        status.to_string());
+            _close_task(task, status);
             continue;
         }
         fragment_ctx->trigger_report_if_necessary();
@@ -201,29 +186,15 @@ void TaskScheduler::_do_work(size_t index) {
             // added to running queue when dependency is ready.
             if (task->is_pending_finish()) {
                 // Only meet eos, should set task to PENDING_FINISH state
-                task->set_state(PipelineTaskState::PENDING_FINISH);
                 task->set_running(false);
             } else {
                 Status exec_status = 
fragment_ctx->get_query_ctx()->exec_status();
-                _close_task(task, PipelineTaskState::FINISHED, exec_status);
+                _close_task(task, exec_status);
             }
             continue;
         }
 
-        auto pipeline_state = task->get_state();
-        switch (pipeline_state) {
-        case PipelineTaskState::BLOCKED:
-            task->set_running(false);
-            break;
-        case PipelineTaskState::RUNNABLE:
-            task->set_running(false);
-            static_cast<void>(_task_queue->push_back(task, index));
-            break;
-        default:
-            DCHECK(false) << "error state after run task, " << 
get_state_name(pipeline_state)
-                          << " task: " << task->debug_string();
-            break;
-        }
+        task->set_running(false);
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to