github-actions[bot] commented on code in PR #26879:
URL: https://github.com/apache/doris/pull/26879#discussion_r1394176172


##########
be/src/pipeline/pipeline_x/pipeline_x_task.h:
##########
@@ -70,56 +70,66 @@ class PipelineXTask : public PipelineTask {
     // must be call after all pipeline task is finish to release resource
     Status close(Status exec_status) override;
 
+    Dependency* read_blocked_dependency() {
+        for (auto* op_dep : _read_dependencies) {
+            _blocked_dep = op_dep->read_blocked_by(this);
+            if (_blocked_dep != nullptr) {
+                // TODO(gabriel):
+                _use_blocking_queue = true;
+                _blocked_dep->start_read_watcher();
+                return _blocked_dep;
+            }
+        }
+        return nullptr;
+    }
+
     bool source_can_read() override {
         if (_dry_run) {
             return true;
         }
-        for (auto* op_dep : _read_dependencies) {
-            auto* dep = op_dep->read_blocked_by();
-            if (dep != nullptr) {
-                dep->start_read_watcher();
-                push_blocked_task_to_dependency(dep);
-                return false;
-            }
-        }
-        return true;
+        return read_blocked_dependency() == nullptr;
     }
 
     bool runtime_filters_are_ready_or_timeout() override {
-        auto* dep = _filter_dependency->filter_blocked_by();
-        if (dep != nullptr) {
-            push_blocked_task_to_dependency(dep);
-            return false;
-        }
-        return true;
+        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not 
reach here!");
+        return false;
     }
 
-    bool sink_can_write() override {
-        auto* dep = _write_dependencies->write_blocked_by();
-        if (dep != nullptr) {
-            dep->start_write_watcher();
-            push_blocked_task_to_dependency(dep);
-            return false;
+    Dependency* write_blocked_dependency() {
+        _blocked_dep = _write_dependencies->write_blocked_by(this);
+        if (_blocked_dep != nullptr) {
+            _push_blocked_task_to_dep();
+            static_cast<WriteDependency*>(_blocked_dep)->start_write_watcher();
+            return _blocked_dep;
         }
-        return true;
+        return nullptr;
     }
 
+    bool sink_can_write() override { return write_blocked_dependency() == 
nullptr; }
+
     Status finalize() override;
 
     std::string debug_string() override;
 
-    bool is_pending_finish() override {
+    Dependency* finish_blocked_dependency(bool skip_current_dep) {

Review Comment:
   warning: method 'finish_blocked_dependency' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Dependency* finish_blocked_dependency(bool skip_current_dep) {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to