This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit efb47cb9a0388c4161d85b13b5f0898d7a76568c
Author: jacktengg <18241664+jackte...@users.noreply.github.com>
AuthorDate: Tue Sep 10 17:52:57 2024 +0800

    [fix](exchange) fix coredump of update counter is block is null
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 58ca10af644..e2619a05703 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -159,10 +159,10 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
             _busy_channels++;
         }
         if (request.block) {
+            
_parent->memory_used_counter()->update(request.block->ByteSizeLong());
             RETURN_IF_ERROR(
                     
BeExecVersionManager::check_be_exec_version(request.block->be_exec_version()));
         }
-        _parent->memory_used_counter()->update(request.block->ByteSizeLong());
         _instance_to_package_queue[ins_id].emplace(std::move(request));
         _total_queue_size++;
         if (_queue_dependency && _total_queue_size > _queue_capacity) {
@@ -194,10 +194,11 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
             _busy_channels++;
         }
         if (request.block_holder->get_block()) {
+            _parent->memory_used_counter()->update(
+                    request.block_holder->get_block()->ByteSizeLong());
             RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
                     request.block_holder->get_block()->be_exec_version()));
         }
-        
_parent->memory_used_counter()->update(request.block_holder->get_block()->ByteSizeLong());
         _instance_to_broadcast_package_queue[ins_id].emplace(request);
     }
     if (send_now) {
@@ -429,8 +430,10 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId 
id) {
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
             _instance_to_broadcast_package_queue[id];
     for (; !broadcast_q.empty(); broadcast_q.pop()) {
-        _parent->memory_used_counter()->update(
-                
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
+        if (broadcast_q.front().block_holder->get_block()) {
+            _parent->memory_used_counter()->update(
+                    
-broadcast_q.front().block_holder->get_block()->ByteSizeLong());
+        }
     }
     {
         std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> 
empty;
@@ -439,7 +442,9 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) 
{
 
     std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
     for (; !q.empty(); q.pop()) {
-        
_parent->memory_used_counter()->update(-q.front().block->ByteSizeLong());
+        if (q.front().block) {
+            
_parent->memory_used_counter()->update(-q.front().block->ByteSizeLong());
+        }
     }
 
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to