This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1d2dbe7898 [Bug][Pipeline] Run clickbench dead lock in pipeline exec engine (#18211) 1d2dbe7898 is described below commit 1d2dbe78989e43976cce5327538b30d5303b9fe9 Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Mar 30 21:41:57 2023 +0800 [Bug][Pipeline] Run clickbench dead lock in pipeline exec engine (#18211) In pipeline exec engine run clickbench may dead lock in some query --- be/src/vec/runtime/vdata_stream_recvr.cpp | 5 +++++ be/src/vec/runtime/vdata_stream_recvr.h | 4 ++++ be/src/vec/sink/vdata_stream_sender.h | 12 ++++++------ 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 79cb7b5cfd..c3bb910d70 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -373,6 +373,11 @@ void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { _sender_queues[use_sender_id]->add_block(block, use_move); } +bool VDataStreamRecvr::sender_queue_empty(int sender_id) { + int use_sender_id = _is_merging ? sender_id : 0; + return _sender_queues[use_sender_id]->queue_empty(); +} + bool VDataStreamRecvr::ready_to_read() { for (size_t i = 0; i < _sender_queues.size(); i++) { if (_sender_queues[i]->should_wait()) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 1fc635a7f7..66941dae48 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -68,6 +68,8 @@ public: void add_block(Block* block, int sender_id, bool use_move); + bool sender_queue_empty(int sender_id); + bool ready_to_read(); Status get_next(Block* block, bool* eos); @@ -174,6 +176,8 @@ public: void close(); + bool queue_empty() { return _block_queue_empty; } + protected: virtual void _update_block_queue_empty() {} Status _inner_get_batch(Block* block, bool* eos); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 5bf116af12..d0357cdbcc 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -233,8 +233,7 @@ public: _need_close(false), _brpc_dest_addr(brpc_dest), _is_transfer_chain(is_transfer_chain), - _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch), - _capacity(std::max(1, buffer_size / std::max(_row_desc.get_row_size(), 1))) { + _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) { std::string localhost = BackendOptions::get_localhost(); _is_local = (_brpc_dest_addr.hostname == localhost) && (_brpc_dest_addr.port == config::brpc_port); @@ -292,8 +291,6 @@ public: return uid.to_string(); } - TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; } - bool is_local() const { return _is_local; } virtual void ch_roll_pb_block(); @@ -302,7 +299,11 @@ public: if (!is_local()) { return true; } - return !_local_recvr || _local_recvr->is_closed() || !_local_recvr->exceeds_limit(0); + + // if local recvr queue mem over the exchange node mem limit, we must ensure each queue + // has one block to do merge sort in exchange node to prevent the logic dead lock + return !_local_recvr || _local_recvr->is_closed() || !_local_recvr->exceeds_limit(0) || + _local_recvr->sender_queue_empty(_parent->_sender_id); } protected: @@ -363,7 +364,6 @@ protected: bool _send_query_statistics_with_every_batch; RuntimeState* _state; - size_t _capacity; bool _is_local; std::shared_ptr<VDataStreamRecvr> _local_recvr; // serialized blocks for broadcasting; we need two so we can write --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org