Gabriel39 commented on code in PR #37829:
URL: https://github.com/apache/doris/pull/37829#discussion_r1680478262


##########
be/src/pipeline/local_exchange/local_exchanger.h:
##########
@@ -61,8 +66,10 @@ class ExchangerBase {
     friend class LocalExchangeSourceLocalState;
     friend class LocalExchangeSinkOperatorX;
     friend class LocalExchangeSinkLocalState;
-    std::atomic<int> _running_sink_operators = 0;
-    std::atomic<int> _running_source_operators = 0;
+    // ref from LocalExchangeSharedState
+    const std::atomic<int>& _running_sink_operators;

Review Comment:
   Just delete this



##########
be/src/pipeline/dependency.h:
##########
@@ -72,16 +70,33 @@ struct BasicSharedState {
                 << " and expect type is" << typeid(TARGET).name();
         return reinterpret_cast<const TARGET*>(this);
     }
-    std::vector<DependencySPtr> source_deps;
-    std::vector<DependencySPtr> sink_deps;
+    //sub when operator close
+
+    virtual void sub_running_sink_operators();
+    // When all sources are closed, it will wake up the sink
+    // (for example, if there is a limit in the downstream pipeline, causing 
the source to end early).
+    virtual void sub_running_source_operators();

Review Comment:
   remove `virtual`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to