This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 64f71a5e995 [fix](pipeline) only sub_running_sink_operators in close (#43500) (#43649) 64f71a5e995 is described below commit 64f71a5e99580cbb5332b03a960d16cb6385b587 Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Tue Nov 12 15:45:56 2024 +0800 [fix](pipeline) only sub_running_sink_operators in close (#43500) (#43649) https://github.com/apache/doris/pull/43500 Previously, sub_running_sink_operators was called only when encountering EOS during sink or when all sources were closed. However, this approach has issues, as it’s possible for the user to manually cancel, in which case there may be no EOS and the sources may not be closed. This would prevent running_sink_operators from reaching zero, leading to errors. ``` PipelineTask[this = 0x7fc369fe9600, id = 0, open = true, eos = false, finish = false, dry run = false, elapse time = 26361.740784032s], block dependency = NULL, is running = true operators: LOCAL_EXCHANGE_OPERATOR (LOCAL_MERGE_SORT): id=-5, parallel_tasks=4, _channel_id: 0, _num_partitions: 4, _num_senders: 4, _num_sources: 4, _running_sink_operators: 1, _running_source_operators: 1, mem_usage: 0, data queue info: Data Queue 0: [size approx = 0, eos = false], MemTrackers: 0: 0, 1: 34537472, 2: 5701632, 3: 0, DATA_STREAM_SINK_OPERATOR: id=6, Sink Buffer: (_should_stop = false, _busy_channels = 0, _is_finishing = false), _reach_limit: false 0. this=0x7fc376438f10, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, block task = 0, ready=true, _always_ready=true 0. this=0x7fc3764bc110, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, block task = 0, ready=true, _always_ready=true 0. this=0x7fc3764bc310, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, block task = 0, ready=true, _always_ready=true 0. this=0x7fc3764bc510, LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY: id=-5, block task = 0, ready=true, _always_ready=true ``` --- .../local_exchange/local_exchange_sink_operator.cpp | 16 ++++++++++++---- .../local_exchange/local_exchange_sink_operator.h | 1 + 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index ff243186c47..a939d25654b 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -110,6 +110,18 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } +Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(Base::exec_time_counter()); + SCOPED_TIMER(Base::_close_timer); + if (Base::_closed) { + return Status::OK(); + } + if (_shared_state) { + _shared_state->sub_running_sink_operators(); + } + return Base::close(state, exec_status); +} + std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, @@ -132,12 +144,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* // If all exchange sources ended due to limit reached, current task should also finish if (local_state._exchanger->_running_source_operators == 0) { - local_state._shared_state->sub_running_sink_operators(); return Status::EndOfFile("receiver eof"); } - if (eos) { - local_state._shared_state->sub_running_sink_operators(); - } return Status::OK(); } diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 09b1f2cc310..4c4a400c2bd 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -45,6 +45,7 @@ public: Status open(RuntimeState* state) override; std::string debug_string(int indentation_level) const override; std::vector<Dependency*> dependencies() const override; + Status close(RuntimeState* state, Status exec_status) override; private: friend class LocalExchangeSinkOperatorX; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org