This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2310915c2692f38b43d7bbdf4b94f3bc734cc13f Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue May 28 10:31:31 2024 +0800 [fix](pipeline) Fix query hang if limited rows is reached (#35466) ## Proposed changes Some operators has limit condition, the source operator should notify the sink operator that limit reached. Although FE has limit logic but it not always send . ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... --- be/src/pipeline/pipeline_x/dependency.cpp | 7 +++++++ be/src/pipeline/pipeline_x/dependency.h | 5 +++++ .../local_exchange/local_exchange_sink_operator.cpp | 11 +++++++++-- .../local_exchange/local_exchange_source_operator.cpp | 17 +++++++++++++++-- .../local_exchange/local_exchange_source_operator.h | 1 + .../pipeline_x/local_exchange/local_exchanger.h | 4 ++++ 6 files changed, 41 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 093e26ff854..011c32901bc 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -194,6 +194,13 @@ void LocalExchangeSharedState::sub_running_sink_operators() { } } +void LocalExchangeSharedState::sub_running_source_operators() { + std::unique_lock<std::mutex> lc(le_lock); + if (exchanger->_running_source_operators.fetch_sub(1) == 1) { + _set_always_ready(); + } +} + LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) { source_deps.resize(num_instances, nullptr); mem_trackers.resize(num_instances, nullptr); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 525a6dea562..1d14c66a5bc 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -784,11 +784,16 @@ public: } }; void sub_running_sink_operators(); + void sub_running_source_operators(); void _set_always_ready() { for (auto& dep : source_deps) { DCHECK(dep); dep->set_always_ready(); } + for (auto& dep : sink_deps) { + DCHECK(dep); + dep->set_always_ready(); + } } Dependency* get_dep_by_channel_id(int channel_id) { return source_deps[channel_id].get(); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index d4252de7153..74cfc50175c 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -61,10 +61,11 @@ std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) con fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " - "_running_sink_operators: {}, _release_count: {}", + "_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators, _release_count); + _exchanger->_running_sink_operators, _exchanger->_running_source_operators, + _release_count); return fmt::to_string(debug_string_buffer); } @@ -75,6 +76,12 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, local_state)); + // If all exchange sources ended due to limit reached, current task should also finish + if (local_state._exchanger->_running_source_operators == 0) { + local_state._release_count = true; + local_state._shared_state->sub_running_sink_operators(); + return Status::EndOfFile("receiver eof"); + } if (eos) { local_state._shared_state->sub_running_sink_operators(); local_state._release_count = true; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 4b0840ea01a..b56cd2b8531 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -47,13 +47,26 @@ Status LocalExchangeSourceLocalState::open(RuntimeState* state) { return Status::OK(); } +Status LocalExchangeSourceLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + + if (_shared_state) { + _shared_state->sub_running_source_operators(); + } + + return Base::close(state); +} + std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, - "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}", + "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " + "_running_sink_operators: {}, _running_source_operators: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators); + _exchanger->_running_sink_operators, _exchanger->_running_source_operators); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 7d416b10c19..c7583d1351c 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -36,6 +36,7 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; + Status close(RuntimeState* state) override; std::string debug_string(int indentation_level) const override; private: diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index 69e9f79d3e3..cc91b4f1a10 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -29,11 +29,13 @@ class Exchanger { public: Exchanger(int running_sink_operators, int num_partitions) : _running_sink_operators(running_sink_operators), + _running_source_operators(num_partitions), _num_partitions(num_partitions), _num_senders(running_sink_operators), _num_sources(num_partitions) {} Exchanger(int running_sink_operators, int num_sources, int num_partitions) : _running_sink_operators(running_sink_operators), + _running_source_operators(num_partitions), _num_partitions(num_partitions), _num_senders(running_sink_operators), _num_sources(num_sources) {} @@ -48,8 +50,10 @@ protected: friend struct LocalExchangeSharedState; friend struct ShuffleBlockWrapper; friend class LocalExchangeSourceLocalState; + friend class LocalExchangeSinkOperatorX; friend class LocalExchangeSinkLocalState; std::atomic<int> _running_sink_operators = 0; + std::atomic<int> _running_source_operators = 0; const int _num_partitions; const int _num_senders; const int _num_sources; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org