github-actions[bot] commented on code in PR #26879:
URL: https://github.com/apache/doris/pull/26879#discussion_r1395738661


##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -21,10 +21,140 @@
 #include <mutex>
 
 #include "common/logging.h"
+#include "pipeline/pipeline_fragment_context.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
 #include "runtime/memory/mem_tracker.h"
 
 namespace doris::pipeline {
 
+void Dependency::add_block_task(PipelineXTask* task) {
+    // TODO(gabriel): support read dependency
+    if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] == 
task) {
+        return;
+    }
+    task->set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
+    _blocked_task.push_back(task);
+}
+
+void WriteDependency::add_write_block_task(PipelineXTask* task) {
+    DCHECK(_write_blocked_task.empty() ||
+           _write_blocked_task[_write_blocked_task.size() - 1] != task)
+            << "Duplicate task: " << task->debug_string();
+    DCHECK(!_ready_for_write) << "It is not allowed: task: " << 
task->debug_string()
+                              << " \n dependency: " << debug_string()
+                              << " \n state: " << 
get_state_name(task->get_state());
+    task->set_state(PipelineTaskState::BLOCKED_FOR_SINK);
+    task->set_blocked(true);
+    _write_blocked_task.push_back(task);
+}
+
+void FinishDependency::add_block_task(PipelineXTask* task) {
+    DCHECK(_finish_blocked_task.empty() ||
+           _finish_blocked_task[_finish_blocked_task.size() - 1] != task)
+            << "Duplicate task: " << task->debug_string();
+    DCHECK(!_ready_to_finish) << "It is not allowed: task: " << 
task->debug_string()
+                              << " \n dependency: " << debug_string()
+                              << " \n state: " << 
get_state_name(task->get_state());
+    task->set_state(PipelineTaskState::PENDING_FINISH);
+    task->set_blocked(true);
+    _finish_blocked_task.push_back(task);
+}
+
+void RuntimeFilterDependency::add_block_task(PipelineXTask* task) {
+    DCHECK(_filter_blocked_task.empty() ||
+           _filter_blocked_task[_filter_blocked_task.size() - 1] != task)
+            << "Duplicate task: " << task->debug_string();
+    DCHECK(_blocked_by_rf) << "It is not allowed: task: " << 
task->debug_string()
+                           << " \n dependency: " << debug_string()
+                           << " \n state: " << 
get_state_name(task->get_state());
+    task->set_state(PipelineTaskState::BLOCKED_FOR_RF);
+    task->set_blocked(true);
+    _filter_blocked_task.push_back(task);
+}
+
+void Dependency::set_ready_for_read() {
+    if (_ready_for_read) {
+        return;
+    }
+    _read_dependency_watcher.stop();
+    std::vector<PipelineXTask*> local_block_task {};
+    {
+        std::unique_lock<std::mutex> lc(_task_lock);
+        if (_ready_for_read) {
+            return;
+        }
+        _ready_for_read = true;
+        local_block_task.swap(_blocked_task);
+    }
+}
+
+void WriteDependency::set_ready_for_write() {
+    if (_ready_for_write) {
+        return;
+    }
+    _write_dependency_watcher.stop();
+
+    std::vector<PipelineXTask*> local_block_task {};
+    {
+        std::unique_lock<std::mutex> lc(_task_lock);
+        if (_ready_for_write) {
+            return;
+        }
+        _ready_for_write = true;
+        local_block_task.swap(_write_blocked_task);
+    }
+    for (auto* task : local_block_task) {
+        DCHECK(task->get_state() != PipelineTaskState::RUNNABLE);
+        DCHECK(avoid_using_blocked_queue(task->get_state()));
+        task->try_wake_up(this);
+    }
+}
+
+void FinishDependency::set_ready_to_finish() {

Review Comment:
   warning: method 'set_ready_to_finish' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_x/dependency.h:188:
   ```diff
   -     void set_ready_to_finish();
   +     static void set_ready_to_finish();
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -21,10 +21,140 @@
 #include <mutex>
 
 #include "common/logging.h"
+#include "pipeline/pipeline_fragment_context.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
 #include "runtime/memory/mem_tracker.h"
 
 namespace doris::pipeline {
 
+void Dependency::add_block_task(PipelineXTask* task) {
+    // TODO(gabriel): support read dependency
+    if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] == 
task) {
+        return;
+    }
+    task->set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
+    _blocked_task.push_back(task);
+}
+
+void WriteDependency::add_write_block_task(PipelineXTask* task) {

Review Comment:
   warning: method 'add_write_block_task' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_x/dependency.h:152:
   ```diff
   -     void add_write_block_task(PipelineXTask* task);
   +     static void add_write_block_task(PipelineXTask* task);
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to