This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new db0a43bad2e [Chore](exchange) change 
LocalExchangeSharedState:mem_usage signed ty… (#37981)
db0a43bad2e is described below

commit db0a43bad2ecf46974f90d2c3906d258abab5f43
Author: Pxl <pxl...@qq.com>
AuthorDate: Wed Jul 17 13:46:51 2024 +0800

    [Chore](exchange) change LocalExchangeSharedState:mem_usage signed ty… 
(#37981)
    
    pick from #36682
---
 be/src/pipeline/pipeline_x/dependency.h            | 17 ++--
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 95 ++++++++++------------
 2 files changed, 47 insertions(+), 65 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index b60b3e9ae3b..c82104f7535 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -365,9 +365,8 @@ public:
     std::vector<size_t> make_nullable_keys;
 
     struct MemoryRecord {
-        MemoryRecord() : used_in_arena(0), used_in_state(0) {}
-        int64_t used_in_arena;
-        int64_t used_in_state;
+        int64_t used_in_arena {};
+        int64_t used_in_state {};
     };
     MemoryRecord mem_usage_record;
     bool agg_data_created_without_key = false;
@@ -754,14 +753,9 @@ struct DataDistribution {
     DataDistribution(ExchangeType type) : distribution_type(type) {}
     DataDistribution(ExchangeType type, const std::vector<TExpr>& 
partition_exprs_)
             : distribution_type(type), partition_exprs(partition_exprs_) {}
-    DataDistribution(const DataDistribution& other)
-            : distribution_type(other.distribution_type), 
partition_exprs(other.partition_exprs) {}
+    DataDistribution(const DataDistribution& other) = default;
     bool need_local_exchange() const { return distribution_type != 
ExchangeType::NOOP; }
-    DataDistribution& operator=(const DataDistribution& other) {
-        distribution_type = other.distribution_type;
-        partition_exprs = other.partition_exprs;
-        return *this;
-    }
+    DataDistribution& operator=(const DataDistribution& other) = default;
     ExchangeType distribution_type;
     std::vector<TExpr> partition_exprs;
 };
@@ -774,7 +768,8 @@ public:
     LocalExchangeSharedState(int num_instances);
     std::unique_ptr<ExchangerBase> exchanger {};
     std::vector<MemTracker*> mem_trackers;
-    std::atomic<size_t> mem_usage = 0;
+    std::atomic<int64_t> mem_usage = 0;
+    // We need to make sure to add mem_usage first and then enqueue, otherwise 
sub mem_usage may cause negative mem_usage during concurrent dequeue.
     std::mutex le_lock;
     void create_source_dependencies(int operator_id, int node_id, 
QueryContext* ctx) {
         for (size_t i = 0; i < source_deps.size(); i++) {
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index f02fd0e5f04..7a044aaa77f 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -72,20 +72,14 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
         return Status::OK();
     };
 
-    if (_running_sink_operators == 0) {
-        if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
-            SCOPED_TIMER(local_state._copy_data_timer);
-            mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
-                    block, partitioned_block.first->data_block);
-            RETURN_IF_ERROR(get_data(block));
-        } else {
-            *eos = true;
-        }
-    } else if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
         SCOPED_TIMER(local_state._copy_data_timer);
         mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
                 block, partitioned_block.first->data_block);
         RETURN_IF_ERROR(get_data(block));
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
@@ -142,6 +136,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                 if (data_queue[it.second].enqueue({new_block_wrapper, 
{row_idx, start, size}})) {
                     local_state._shared_state->set_ready_to_read(it.second);
                 } else {
+                    local_state._shared_state->sub_mem_usage(
+                            it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
                     new_block_wrapper->unref(local_state._shared_state);
                 }
             } else {
@@ -160,6 +156,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                             {new_block_wrapper, {row_idx, start, size}})) {
                     local_state._shared_state->set_ready_to_read(i % 
_num_sources);
                 } else {
+                    local_state._shared_state->sub_mem_usage(
+                            i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(),
+                            false);
                     new_block_wrapper->unref(local_state._shared_state);
                 }
             } else {
@@ -179,6 +178,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                 if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}})) {
                     local_state._shared_state->set_ready_to_read(map[i]);
                 } else {
+                    local_state._shared_state->sub_mem_usage(
+                            map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
                     new_block_wrapper->unref(local_state._shared_state);
                 }
             } else {
@@ -198,9 +199,12 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     }
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
-    local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
+    size_t memory_usage = new_block.allocated_bytes();
+    local_state._shared_state->add_mem_usage(channel_id, memory_usage);
     if (_data_queue[channel_id].enqueue(std::move(new_block))) {
         local_state._shared_state->set_ready_to_read(channel_id);
+    } else {
+        local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
     }
 
     return Status::OK();
@@ -218,25 +222,16 @@ void 
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            block->swap(next_block);
-            local_state._shared_state->sub_mem_usage(local_state._channel_id,
-                                                     block->allocated_bytes());
-            if (_free_block_limit == 0 ||
-                _free_blocks.size_approx() < _free_block_limit * _num_sources) 
{
-                _free_blocks.enqueue(std::move(next_block));
-            }
-        } else {
-            *eos = true;
-        }
-    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         block->swap(next_block);
+        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
-        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
@@ -262,14 +257,11 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         return Status::OK();
     }
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[0].try_dequeue(next_block)) {
-            *block = std::move(next_block);
-        } else {
-            *eos = true;
-        }
-    } else if (_data_queue[0].try_dequeue(next_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[0].try_dequeue(next_block)) {
         *block = std::move(next_block);
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
@@ -301,14 +293,11 @@ void 
BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                      LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            *block = std::move(next_block);
-        } else {
-            *eos = true;
-        }
-    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         *block = std::move(next_block);
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
@@ -325,9 +314,12 @@ Status 
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
     }
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
-    local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
+    size_t memory_usage = new_block.allocated_bytes();
+    local_state._shared_state->add_mem_usage(channel_id, memory_usage);
     if (_data_queue[channel_id].enqueue(std::move(new_block))) {
         local_state._shared_state->set_ready_to_read(channel_id);
+    } else {
+        local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
     }
 
     return Status::OK();
@@ -383,9 +375,13 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
                     
vectorized::MutableBlock::create_unique(block->clone_empty());
             RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
             auto new_block = mutable_block->to_block();
-            local_state._shared_state->add_mem_usage(i, 
new_block.allocated_bytes());
+
+            size_t memory_usage = new_block.allocated_bytes();
+            local_state._shared_state->add_mem_usage(i, memory_usage);
             if (data_queue[i].enqueue(std::move(new_block))) {
                 local_state._shared_state->set_ready_to_read(i);
+            } else {
+                local_state._shared_state->sub_mem_usage(i, memory_usage);
             }
         }
     }
@@ -408,25 +404,16 @@ Status 
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
                                                bool* eos,
                                                LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            block->swap(next_block);
-            if (_free_block_limit == 0 ||
-                _free_blocks.size_approx() < _free_block_limit * _num_sources) 
{
-                _free_blocks.enqueue(std::move(next_block));
-            }
-            local_state._shared_state->sub_mem_usage(local_state._channel_id,
-                                                     block->allocated_bytes());
-        } else {
-            *eos = true;
-        }
-    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         block->swap(next_block);
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
         local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to