This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit efb47cb9a0388c4161d85b13b5f0898d7a76568c Author: jacktengg <18241664+jackte...@users.noreply.github.com> AuthorDate: Tue Sep 10 17:52:57 2024 +0800 [fix](exchange) fix coredump of update counter is block is null --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 58ca10af644..e2619a05703 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -159,10 +159,10 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { _busy_channels++; } if (request.block) { + _parent->memory_used_counter()->update(request.block->ByteSizeLong()); RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); } - _parent->memory_used_counter()->update(request.block->ByteSizeLong()); _instance_to_package_queue[ins_id].emplace(std::move(request)); _total_queue_size++; if (_queue_dependency && _total_queue_size > _queue_capacity) { @@ -194,10 +194,11 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { _busy_channels++; } if (request.block_holder->get_block()) { + _parent->memory_used_counter()->update( + request.block_holder->get_block()->ByteSizeLong()); RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( request.block_holder->get_block()->be_exec_version())); } - _parent->memory_used_counter()->update(request.block_holder->get_block()->ByteSizeLong()); _instance_to_broadcast_package_queue[ins_id].emplace(request); } if (send_now) { @@ -429,8 +430,10 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = _instance_to_broadcast_package_queue[id]; for (; !broadcast_q.empty(); broadcast_q.pop()) { - _parent->memory_used_counter()->update( - -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + if (broadcast_q.front().block_holder->get_block()) { + _parent->memory_used_counter()->update( + -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + } } { std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty; @@ -439,7 +442,9 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; for (; !q.empty(); q.pop()) { - _parent->memory_used_counter()->update(-q.front().block->ByteSizeLong()); + if (q.front().block) { + _parent->memory_used_counter()->update(-q.front().block->ByteSizeLong()); + } } { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org