Gabriel39 commented on code in PR #23639:
URL: https://github.com/apache/doris/pull/23639#discussion_r1309560444


##########
be/src/pipeline/pipeline.h:
##########
@@ -118,6 +147,34 @@ class Pipeline : public 
std::enable_shared_from_this<Pipeline> {
     DataSinkOperatorXPtr _sink_x;
 
     std::shared_ptr<ObjectPool> _obj_pool;
+
+    Operators _old_operators;

Review Comment:
   use `_operators` for Operators and `_operator_xs` for OperatorXs



##########
be/src/pipeline/pipeline.h:
##########
@@ -45,31 +45,60 @@ using PipelineId = uint32_t;
 
 class Pipeline : public std::enable_shared_from_this<Pipeline> {
     friend class PipelineTask;
+    friend class PipelineXTask;
 
 public:
     Pipeline() = delete;
     explicit Pipeline(PipelineId pipeline_id, 
std::weak_ptr<PipelineFragmentContext> context)
-            : _complete_dependency(0), _pipeline_id(pipeline_id), 
_context(context) {
+            : _pipeline_id(pipeline_id), _context(context) {
         _init_profile();
     }
 
     void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
-        pipeline->_parents.push_back(weak_from_this());
-        _dependencies.push_back(pipeline);
+        pipeline->_parents.push_back({_operator_builders.size(), 
weak_from_this()});
+        _dependencies.push_back({_operator_builders.size(), pipeline});
     }
 
     // If all dependencies are finished, this pipeline task should be 
scheduled.
     // e.g. Hash join probe task will be scheduled once Hash join build task 
is finished.
-    bool finish_one_dependency(int dependency_core_id) {
-        DCHECK(_complete_dependency < _dependencies.size());
-        bool finish = _complete_dependency.fetch_add(1) == 
_dependencies.size() - 1;
-        if (finish) {
+    void finish_one_dependency(int dep_opr, int dependency_core_id) {
+        std::lock_guard l(_depend_mutex);
+
+        int i;
+        auto dispose_short_circuit = [&](auto& operators) {
+            _always_can_read = true;
+            _always_can_write = (dep_opr == operators.size());
+
+            for (i = 0; i < _dependencies.size(); ++i) {
+                if (dep_opr == _dependencies[i].first) {
+                    _dependencies.erase(_dependencies.begin(), 
_dependencies.begin() + i + 1);
+                    break;
+                }
+            }
+        };
+
+        if (!_old_operators.empty() && _old_operators[dep_opr - 
1]->can_terminate_early()) {
+            dispose_short_circuit(_old_operators);
+        } else if (!_operators.empty() && _operators[dep_opr - 
1]->can_terminate_early()) {

Review Comment:
   Do not need to judge _operators because pipelineX doesn't use this logic any 
more



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to