This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d2dbe7898 [Bug][Pipeline] Run clickbench dead lock in pipeline exec 
engine (#18211)
1d2dbe7898 is described below

commit 1d2dbe78989e43976cce5327538b30d5303b9fe9
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Thu Mar 30 21:41:57 2023 +0800

    [Bug][Pipeline] Run clickbench dead lock in pipeline exec engine (#18211)
    
    In pipeline exec engine run clickbench may dead lock in some query
---
 be/src/vec/runtime/vdata_stream_recvr.cpp |  5 +++++
 be/src/vec/runtime/vdata_stream_recvr.h   |  4 ++++
 be/src/vec/sink/vdata_stream_sender.h     | 12 ++++++------
 3 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 79cb7b5cfd..c3bb910d70 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -373,6 +373,11 @@ void VDataStreamRecvr::add_block(Block* block, int 
sender_id, bool use_move) {
     _sender_queues[use_sender_id]->add_block(block, use_move);
 }
 
+bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
+    int use_sender_id = _is_merging ? sender_id : 0;
+    return _sender_queues[use_sender_id]->queue_empty();
+}
+
 bool VDataStreamRecvr::ready_to_read() {
     for (size_t i = 0; i < _sender_queues.size(); i++) {
         if (_sender_queues[i]->should_wait()) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 1fc635a7f7..66941dae48 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -68,6 +68,8 @@ public:
 
     void add_block(Block* block, int sender_id, bool use_move);
 
+    bool sender_queue_empty(int sender_id);
+
     bool ready_to_read();
 
     Status get_next(Block* block, bool* eos);
@@ -174,6 +176,8 @@ public:
 
     void close();
 
+    bool queue_empty() { return _block_queue_empty; }
+
 protected:
     virtual void _update_block_queue_empty() {}
     Status _inner_get_batch(Block* block, bool* eos);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 5bf116af12..d0357cdbcc 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -233,8 +233,7 @@ public:
               _need_close(false),
               _brpc_dest_addr(brpc_dest),
               _is_transfer_chain(is_transfer_chain),
-              
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
-              _capacity(std::max(1, buffer_size / 
std::max(_row_desc.get_row_size(), 1))) {
+              
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) 
{
         std::string localhost = BackendOptions::get_localhost();
         _is_local = (_brpc_dest_addr.hostname == localhost) &&
                     (_brpc_dest_addr.port == config::brpc_port);
@@ -292,8 +291,6 @@ public:
         return uid.to_string();
     }
 
-    TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; 
}
-
     bool is_local() const { return _is_local; }
 
     virtual void ch_roll_pb_block();
@@ -302,7 +299,11 @@ public:
         if (!is_local()) {
             return true;
         }
-        return !_local_recvr || _local_recvr->is_closed() || 
!_local_recvr->exceeds_limit(0);
+
+        // if local recvr queue mem over the exchange node mem limit, we must 
ensure each queue
+        // has one block to do merge sort in exchange node to prevent the 
logic dead lock
+        return !_local_recvr || _local_recvr->is_closed() || 
!_local_recvr->exceeds_limit(0) ||
+               _local_recvr->sender_queue_empty(_parent->_sender_id);
     }
 
 protected:
@@ -363,7 +364,6 @@ protected:
     bool _send_query_statistics_with_every_batch;
     RuntimeState* _state;
 
-    size_t _capacity;
     bool _is_local;
     std::shared_ptr<VDataStreamRecvr> _local_recvr;
     // serialized blocks for broadcasting; we need two so we can write


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to