This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b864aa7aa23 [fix](pipeline) Fix query hang up if limited rows is 
reached (#35513) (#35746)
b864aa7aa23 is described below

commit b864aa7aa2349a8411425e8d878493d363f15d93
Author: Gabriel <[email protected]>
AuthorDate: Fri May 31 22:50:57 2024 +0800

    [fix](pipeline) Fix query hang up if limited rows is reached (#35513) 
(#35746)
    
    Follow-up for #35466.
    
    We should assure closed tasks will not block other tasks.
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      | 48 ++++++++++++++--------
 be/src/pipeline/exec/exchange_sink_buffer.h        |  9 ++--
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  9 ++--
 be/src/pipeline/exec/exchange_sink_operator.h      |  3 ++
 be/src/pipeline/pipeline_fragment_context.h        |  2 +
 .../local_exchange_source_operator.cpp             |  8 ++++
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 25 +++++++++++
 .../pipeline_x/local_exchange/local_exchanger.h    |  4 ++
 be/src/pipeline/pipeline_x/operator.h              |  8 ++++
 .../pipeline_x/pipeline_x_fragment_context.h       |  7 ++++
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 11 +++--
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |  6 +++
 be/src/runtime/fragment_mgr.cpp                    |  2 +
 13 files changed, 114 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 7f46bfcf353..39a6a59bd49 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -85,7 +85,8 @@ namespace pipeline {
 
 template <typename Parent>
 ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id,
-                                               int send_id, int be_number, 
RuntimeState* state)
+                                               int send_id, int be_number, 
RuntimeState* state,
+                                               ExchangeSinkLocalState* parent)
         : HasTaskExecutionCtx(state),
           _queue_capacity(0),
           _is_finishing(false),
@@ -94,10 +95,8 @@ ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId 
query_id, PlanNodeId de
           _sender_id(send_id),
           _be_number(be_number),
           _state(state),
-          _context(state->get_query_ctx()) {}
-
-template <typename Parent>
-ExchangeSinkBuffer<Parent>::~ExchangeSinkBuffer() = default;
+          _context(state->get_query_ctx()),
+          _parent(parent) {}
 
 template <typename Parent>
 void ExchangeSinkBuffer<Parent>::close() {
@@ -251,8 +250,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
             broadcast_q = _instance_to_broadcast_package_queue[id];
 
     if (_is_finishing) {
-        _rpc_channel_is_idle[id] = true;
-        _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
+        _turn_off_channel(id);
         return Status::OK();
     }
 
@@ -410,8 +408,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         }
         broadcast_q.pop();
     } else {
-        _rpc_channel_is_idle[id] = true;
-        _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
+        _turn_off_channel(id);
     }
 
     return Status::OK();
@@ -443,10 +440,7 @@ void ExchangeSinkBuffer<Parent>::_ended(InstanceLoId id) {
         __builtin_unreachable();
     } else {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[id]);
-        if (!_rpc_channel_is_idle[id]) {
-            _rpc_channel_is_idle[id] = true;
-            _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
-        }
+        _turn_off_channel(id);
     }
 }
 
@@ -454,17 +448,19 @@ template <typename Parent>
 void ExchangeSinkBuffer<Parent>::_failed(InstanceLoId id, const std::string& 
err) {
     _is_finishing = true;
     _context->cancel(err, Status::Cancelled(err));
-    _ended(id);
+    if constexpr (std::is_same_v<ExchangeSinkLocalState, Parent>) {
+        std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[id]);
+        _turn_off_channel(id, true);
+    } else {
+        _ended(id);
+    }
 }
 
 template <typename Parent>
 void ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     _instance_to_receiver_eof[id] = true;
-    if (!_rpc_channel_is_idle[id]) {
-        _rpc_channel_is_idle[id] = true;
-        _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
-    }
+    _turn_off_channel(id, std::is_same_v<ExchangeSinkLocalState, Parent> ? 
true : false);
     std::queue<BroadcastTransmitInfo<Parent>, 
std::list<BroadcastTransmitInfo<Parent>>> empty;
     swap(empty, _instance_to_broadcast_package_queue[id]);
 }
@@ -475,6 +471,22 @@ bool 
ExchangeSinkBuffer<Parent>::_is_receiver_eof(InstanceLoId id) {
     return _instance_to_receiver_eof[id];
 }
 
+template <typename Parent>
+void ExchangeSinkBuffer<Parent>::_turn_off_channel(InstanceLoId id, bool 
cleanup) {
+    if (!_rpc_channel_is_idle[id]) {
+        _rpc_channel_is_idle[id] = true;
+        auto all_done = _busy_channels.fetch_sub(1) == 1;
+        _set_ready_to_finish(all_done);
+        if (cleanup && all_done) {
+            auto weak_task_ctx = weak_task_exec_ctx();
+            if (auto pip_ctx = weak_task_ctx.lock()) {
+                DCHECK(_parent);
+                _parent->set_reach_limit();
+            }
+        }
+    }
+}
+
 template <typename Parent>
 void ExchangeSinkBuffer<Parent>::get_max_min_rpc_time(int64_t* max_time, 
int64_t* min_time) {
     int64_t local_max_time = 0;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 74207bbffd3..cd5502ee6d0 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -47,6 +47,7 @@ using InstanceLoId = int64_t;
 
 namespace pipeline {
 class Dependency;
+class ExchangeSinkLocalState;
 } // namespace pipeline
 
 namespace vectorized {
@@ -199,8 +200,8 @@ template <typename Parent>
 class ExchangeSinkBuffer : public HasTaskExecutionCtx {
 public:
     ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int 
send_id, int be_number,
-                       RuntimeState* state);
-    ~ExchangeSinkBuffer();
+                       RuntimeState* state, ExchangeSinkLocalState* parent = 
nullptr);
+    ~ExchangeSinkBuffer() = default;
     void register_sink(TUniqueId);
 
     Status add_block(TransmitInfo<Parent>&& request);
@@ -271,6 +272,7 @@ private:
     inline void _failed(InstanceLoId id, const std::string& err);
     inline void _set_receiver_eof(InstanceLoId id);
     inline bool _is_receiver_eof(InstanceLoId id);
+    inline void _turn_off_channel(InstanceLoId id, bool cleanup = false);
     void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
     int64_t get_sum_rpc_time();
 
@@ -278,7 +280,8 @@ private:
     std::shared_ptr<Dependency> _queue_dependency = nullptr;
     std::shared_ptr<Dependency> _finish_dependency = nullptr;
     std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
-    std::atomic<bool> _should_stop {false};
+    std::atomic<bool> _should_stop = false;
+    ExchangeSinkLocalState* _parent = nullptr;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index dfcb3e8e120..33b68a5ac30 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -185,7 +185,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
     _sink_buffer = 
std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
-            id, p._dest_node_id, _sender_id, _state->be_number(), state);
+            id, p._dest_node_id, _sender_id, _state->be_number(), state, this);
 
     register_channels(_sink_buffer.get());
     _queue_dependency =
@@ -681,8 +681,11 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
 std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
-    fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, 
_busy_channels = {})",
-                   _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load());
+    fmt::format_to(debug_string_buffer,
+                   ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, 
_is_finishing = {}), "
+                   "_reach_limit: {}",
+                   _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load(),
+                   _sink_buffer->_is_finishing.load(), _reach_limit.load());
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index f275365c0e8..4a217c940ce 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -131,6 +131,8 @@ public:
     RuntimeProfile::Counter* compress_timer() { return _compress_timer; }
     RuntimeProfile::Counter* uncompressed_bytes_counter() { return 
_uncompressed_bytes_counter; }
     [[nodiscard]] bool transfer_large_data_by_brpc() const;
+    bool is_finished() const override { return _reach_limit.load(); }
+    void set_reach_limit() { _reach_limit = true; };
 
     [[nodiscard]] int sender_id() const { return _sender_id; }
 
@@ -231,6 +233,7 @@ private:
 
     // for external table sink hash partition
     std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
+    std::atomic<bool> _reach_limit = false;
 };
 
 class ExchangeSinkOperatorX final : public 
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 009a2a2f22d..770b64d0dc0 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -110,6 +110,8 @@ public:
 
     void close_a_pipeline();
 
+    virtual void clear_finished_tasks() {}
+
     virtual void add_merge_controller_handler(
             std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index b56cd2b8531..dbc4e37bbab 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -52,6 +52,9 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
         return Status::OK();
     }
 
+    if (_exchanger) {
+        _exchanger->close(*this);
+    }
     if (_shared_state) {
         _shared_state->sub_running_source_operators();
     }
@@ -67,6 +70,11 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
                    _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators);
+    size_t i = 0;
+    fmt::format_to(debug_string_buffer, ", MemTrackers: ");
+    for (auto* mem_tracker : _shared_state->mem_trackers) {
+        fmt::format_to(debug_string_buffer, "{}: {}, ", i, 
mem_tracker->consumption());
+    }
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index ce1f05a22bf..abcc0161fd7 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -39,6 +39,16 @@ Status ShuffleExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
     return Status::OK();
 }
 
+void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
+    PartitionedBlock partitioned_block;
+    while 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+        auto block_wrapper = partitioned_block.first;
+        local_state._shared_state->sub_mem_usage(
+                local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
+        block_wrapper->unref(local_state._shared_state);
+    }
+}
+
 Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                    LocalExchangeSourceLocalState& local_state) 
{
     PartitionedBlock partitioned_block;
@@ -185,6 +195,14 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     return Status::OK();
 }
 
+void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
+    vectorized::Block next_block;
+    while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+        local_state._shared_state->sub_mem_usage(local_state._channel_id,
+                                                 next_block.allocated_bytes());
+    }
+}
+
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
@@ -254,6 +272,13 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     return Status::OK();
 }
 
+void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
+    vectorized::Block next_block;
+    while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+        // do nothing
+    }
+}
+
 Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                      LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index cc91b4f1a10..f3b210b11f3 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -45,6 +45,7 @@ public:
     virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool 
eos,
                         LocalExchangeSinkLocalState& local_state) = 0;
     virtual ExchangeType get_type() const = 0;
+    virtual void close(LocalExchangeSourceLocalState& local_state) {}
 
 protected:
     friend struct LocalExchangeSharedState;
@@ -95,6 +96,7 @@ public:
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
                      LocalExchangeSourceLocalState& local_state) override;
+    void close(LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return 
ExchangeType::HASH_SHUFFLE; }
 
 protected:
@@ -137,6 +139,7 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; 
}
+    void close(LocalExchangeSourceLocalState& local_state) override;
 
 private:
     std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
@@ -175,6 +178,7 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
+    void close(LocalExchangeSourceLocalState& local_state) override;
 
 private:
     std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index efa35b2c2fe..97035358037 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -490,6 +490,7 @@ public:
     // idempotent (e.g. wait for runtime filters).
     virtual Status open(RuntimeState* state) = 0;
     virtual Status close(RuntimeState* state, Status exec_status) = 0;
+    [[nodiscard]] virtual bool is_finished() const { return false; }
 
     [[nodiscard]] virtual std::string debug_string(int indentation_level) 
const = 0;
 
@@ -595,6 +596,13 @@ public:
     [[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
                                                    LocalSinkStateInfo& info) = 
0;
 
+    [[nodiscard]] bool is_finished(RuntimeState* state) const {
+        auto result = state->get_sink_local_state_result();
+        if (!result) {
+            return result.error();
+        }
+        return result.value()->is_finished();
+    }
     template <class TARGET>
     TARGET& cast() {
         DCHECK(dynamic_cast<TARGET*>(this))
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 34d00c07652..ea2212357a2 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -70,6 +70,13 @@ public:
 
     ~PipelineXFragmentContext() override;
 
+    void clear_finished_tasks() override {
+        for (size_t j = 0; j < _tasks.size(); j++) {
+            for (size_t i = 0; i < _tasks[j].size(); i++) {
+                _tasks[j][i]->stop_if_finished();
+            }
+        }
+    };
     void instance_ids(std::vector<TUniqueId>& ins_ids) const override {
         ins_ids.resize(_fragment_instance_ids.size());
         for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index c995b66997d..b13cd8c9c61 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -247,7 +247,10 @@ Status PipelineXTask::execute(bool* eos) {
             cpu_qs->add_cpu_nanos(delta_cpu_time);
         }
     }};
-    *eos = false;
+    *eos = _sink->is_finished(_state);
+    if (*eos) {
+        return Status::OK();
+    }
     if (has_dependency()) {
         set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
         return Status::OK();
@@ -310,7 +313,9 @@ Status PipelineXTask::execute(bool* eos) {
             return status;
         });
         // Pull block from operator chain
-        if (!_dry_run) {
+        if (_dry_run || _sink->is_finished(_state)) {
+            *eos = true;
+        } else {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
             try {
@@ -319,8 +324,6 @@ Status PipelineXTask::execute(bool* eos) {
                 return Status::InternalError(e.to_string() +
                                              " task debug string: " + 
debug_string());
             }
-        } else {
-            *eos = true;
         }
 
         if (_block->rows() != 0 || *eos) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index ef55423e16c..83c07827848 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -157,6 +157,12 @@ public:
         return false;
     }
 
+    void stop_if_finished() {
+        if (_sink->is_finished(_state)) {
+            clear_blocking_state();
+        }
+    }
+
     static bool should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes);
 
 private:
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index ba30323addf..7003f1d49f5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1122,6 +1122,8 @@ void FragmentMgr::cancel_worker() {
                     for (auto& ins_id : ins_ids) {
                         to_cancel.push_back(ins_id);
                     }
+                } else {
+                    pipeline_itr.second->clear_finished_tasks();
                 }
             }
             for (auto it = _query_ctx_map.begin(); it != 
_query_ctx_map.end();) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to