This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f8cfe5e579 [Bug](pipeline) add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc (#21169) f8cfe5e579 is described below commit f8cfe5e57959d4352d7593f900dd70229ceb7525 Author: Pxl <pxl...@qq.com> AuthorDate: Thu Jun 29 10:03:57 2023 +0800 [Bug](pipeline) add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc (#21169) add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 7 +++++-- be/src/util/ref_count_closure.h | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index e8b3f76fda..0326929e5c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -62,6 +62,8 @@ void ExchangeSinkBuffer::close() { pair.second->release_finst_id(); pair.second->release_query_id(); } + _instance_to_broadcast_package_queue.clear(); + _instance_to_package_queue.clear(); _instance_to_request.clear(); } @@ -146,7 +148,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { send_now = true; _instance_to_sending_by_pipeline[ins_id.lo] = false; } - _instance_to_broadcast_package_queue[ins_id.lo].emplace(std::move(request)); + _instance_to_broadcast_package_queue[ins_id.lo].emplace(request); } if (send_now) { RETURN_IF_ERROR(_send_rpc(ins_id.lo)); @@ -158,6 +160,8 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); + DCHECK(_instance_to_sending_by_pipeline[id] == false); + std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = _instance_to_broadcast_package_queue[id]; @@ -257,7 +261,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { broadcast_q.pop(); } else { _instance_to_sending_by_pipeline[id] = true; - return Status::OK(); } return Status::OK(); diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index fe6efa7615..d2fbd2fd14 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -31,7 +31,7 @@ template <typename T> class RefCountClosure : public google::protobuf::Closure { public: RefCountClosure() : _refs(0) {} - ~RefCountClosure() {} + ~RefCountClosure() override = default; void ref() { _refs.fetch_add(1); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org