Gabriel39 commented on code in PR #37829: URL: https://github.com/apache/doris/pull/37829#discussion_r1680478262
########## be/src/pipeline/local_exchange/local_exchanger.h: ########## @@ -61,8 +66,10 @@ class ExchangerBase { friend class LocalExchangeSourceLocalState; friend class LocalExchangeSinkOperatorX; friend class LocalExchangeSinkLocalState; - std::atomic<int> _running_sink_operators = 0; - std::atomic<int> _running_source_operators = 0; + // ref from LocalExchangeSharedState + const std::atomic<int>& _running_sink_operators; Review Comment: Just delete this ########## be/src/pipeline/dependency.h: ########## @@ -72,16 +70,33 @@ struct BasicSharedState { << " and expect type is" << typeid(TARGET).name(); return reinterpret_cast<const TARGET*>(this); } - std::vector<DependencySPtr> source_deps; - std::vector<DependencySPtr> sink_deps; + //sub when operator close + + virtual void sub_running_sink_operators(); + // When all sources are closed, it will wake up the sink + // (for example, if there is a limit in the downstream pipeline, causing the source to end early). + virtual void sub_running_source_operators(); Review Comment: remove `virtual` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org