This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2310915c2692f38b43d7bbdf4b94f3bc734cc13f
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue May 28 10:31:31 2024 +0800

    [fix](pipeline) Fix query hang if limited rows is reached (#35466)
    
    ## Proposed changes
    
    Some operators has limit condition, the source operator should notify
    the sink operator that limit reached.
    Although FE has limit logic but it not always send .
    
    ## Further comments
    
    If this is a relatively large or complex change, kick off the discussion
    at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why
    you chose the solution you did and what alternatives you considered,
    etc...
---
 be/src/pipeline/pipeline_x/dependency.cpp               |  7 +++++++
 be/src/pipeline/pipeline_x/dependency.h                 |  5 +++++
 .../local_exchange/local_exchange_sink_operator.cpp     | 11 +++++++++--
 .../local_exchange/local_exchange_source_operator.cpp   | 17 +++++++++++++++--
 .../local_exchange/local_exchange_source_operator.h     |  1 +
 .../pipeline_x/local_exchange/local_exchanger.h         |  4 ++++
 6 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 093e26ff854..011c32901bc 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -194,6 +194,13 @@ void 
LocalExchangeSharedState::sub_running_sink_operators() {
     }
 }
 
+void LocalExchangeSharedState::sub_running_source_operators() {
+    std::unique_lock<std::mutex> lc(le_lock);
+    if (exchanger->_running_source_operators.fetch_sub(1) == 1) {
+        _set_always_ready();
+    }
+}
+
 LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
     source_deps.resize(num_instances, nullptr);
     mem_trackers.resize(num_instances, nullptr);
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 525a6dea562..1d14c66a5bc 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -784,11 +784,16 @@ public:
         }
     };
     void sub_running_sink_operators();
+    void sub_running_source_operators();
     void _set_always_ready() {
         for (auto& dep : source_deps) {
             DCHECK(dep);
             dep->set_always_ready();
         }
+        for (auto& dep : sink_deps) {
+            DCHECK(dep);
+            dep->set_always_ready();
+        }
     }
 
     Dependency* get_dep_by_channel_id(int channel_id) { return 
source_deps[channel_id].get(); }
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index d4252de7153..74cfc50175c 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -61,10 +61,11 @@ std::string LocalExchangeSinkLocalState::debug_string(int 
indentation_level) con
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
                    "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
-                   "_running_sink_operators: {}, _release_count: {}",
+                   "_running_sink_operators: {}, _running_source_operators: 
{}, _release_count: {}",
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
-                   _exchanger->_running_sink_operators, _release_count);
+                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators,
+                   _release_count);
     return fmt::to_string(debug_string_buffer);
 }
 
@@ -75,6 +76,12 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block*
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, 
local_state));
 
+    // If all exchange sources ended due to limit reached, current task should 
also finish
+    if (local_state._exchanger->_running_source_operators == 0) {
+        local_state._release_count = true;
+        local_state._shared_state->sub_running_sink_operators();
+        return Status::EndOfFile("receiver eof");
+    }
     if (eos) {
         local_state._shared_state->sub_running_sink_operators();
         local_state._release_count = true;
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 4b0840ea01a..b56cd2b8531 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -47,13 +47,26 @@ Status LocalExchangeSourceLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
+    if (_closed) {
+        return Status::OK();
+    }
+
+    if (_shared_state) {
+        _shared_state->sub_running_source_operators();
+    }
+
+    return Base::close(state);
+}
+
 std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) 
const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
-                   "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}",
+                   "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
+                   "_running_sink_operators: {}, _running_source_operators: 
{}",
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
-                   _exchanger->_running_sink_operators);
+                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators);
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 7d416b10c19..c7583d1351c 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -36,6 +36,7 @@ public:
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state) override;
     std::string debug_string(int indentation_level) const override;
 
 private:
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index 69e9f79d3e3..cc91b4f1a10 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -29,11 +29,13 @@ class Exchanger {
 public:
     Exchanger(int running_sink_operators, int num_partitions)
             : _running_sink_operators(running_sink_operators),
+              _running_source_operators(num_partitions),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
               _num_sources(num_partitions) {}
     Exchanger(int running_sink_operators, int num_sources, int num_partitions)
             : _running_sink_operators(running_sink_operators),
+              _running_source_operators(num_partitions),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
               _num_sources(num_sources) {}
@@ -48,8 +50,10 @@ protected:
     friend struct LocalExchangeSharedState;
     friend struct ShuffleBlockWrapper;
     friend class LocalExchangeSourceLocalState;
+    friend class LocalExchangeSinkOperatorX;
     friend class LocalExchangeSinkLocalState;
     std::atomic<int> _running_sink_operators = 0;
+    std::atomic<int> _running_source_operators = 0;
     const int _num_partitions;
     const int _num_senders;
     const int _num_sources;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to