This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a95d58b7d1a [Chore](exchange) change 
LocalExchangeSharedState:mem_usage signed type to avoid query … (#36682)
a95d58b7d1a is described below

commit a95d58b7d1a5d74813e0fa6adb7e7d4b4db7e845
Author: Pxl <pxl...@qq.com>
AuthorDate: Mon Jun 24 10:00:59 2024 +0800

    [Chore](exchange) change LocalExchangeSharedState:mem_usage signed type to 
avoid query … (#36682)
    
    …blocked when negative mem_usage
    
    ## Proposed changes
    change LocalExchangeSharedState:mem_usage signed type to avoid query
    blocked when negative mem_usage
---
 be/src/pipeline/dependency.h                       |  40 +++-----
 be/src/pipeline/local_exchange/local_exchanger.cpp | 114 +++++++++++----------
 be/src/pipeline/local_exchange/local_exchanger.h   |   1 +
 3 files changed, 75 insertions(+), 80 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 0f9c698a82e..5214022db13 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -336,9 +336,8 @@ public:
     std::vector<size_t> make_nullable_keys;
 
     struct MemoryRecord {
-        MemoryRecord() : used_in_arena(0), used_in_state(0) {}
-        int64_t used_in_arena;
-        int64_t used_in_state;
+        int64_t used_in_arena {};
+        int64_t used_in_state {};
     };
     MemoryRecord mem_usage_record;
     bool agg_data_created_without_key = false;
@@ -362,11 +361,7 @@ public:
                   _order_directions(order_directions),
                   _null_directions(null_directions) {}
 
-        HeapLimitCursor(const HeapLimitCursor& other) noexcept
-                : _row_id(other._row_id),
-                  _limit_columns(other._limit_columns),
-                  _order_directions(other._order_directions),
-                  _null_directions(other._null_directions) {}
+        HeapLimitCursor(const HeapLimitCursor& other) = default;
 
         HeapLimitCursor(HeapLimitCursor&& other) noexcept
                 : _row_id(other._row_id),
@@ -567,11 +562,10 @@ public:
 };
 
 struct BlockRowPos {
-    BlockRowPos() : block_num(0), row_num(0), pos(0) {}
-    int64_t block_num; //the pos at which block
-    int64_t row_num;   //the pos at which row
-    int64_t pos;       //pos = all blocks size + row_num
-    std::string debug_string() {
+    int64_t block_num {}; //the pos at which block
+    int64_t row_num {};   //the pos at which row
+    int64_t pos {};       //pos = all blocks size + row_num
+    std::string debug_string() const {
         std::string res = "\t block_num: ";
         res += std::to_string(block_num);
         res += "\t row_num: ";
@@ -823,14 +817,9 @@ struct DataDistribution {
     DataDistribution(ExchangeType type) : distribution_type(type) {}
     DataDistribution(ExchangeType type, const std::vector<TExpr>& 
partition_exprs_)
             : distribution_type(type), partition_exprs(partition_exprs_) {}
-    DataDistribution(const DataDistribution& other)
-            : distribution_type(other.distribution_type), 
partition_exprs(other.partition_exprs) {}
+    DataDistribution(const DataDistribution& other) = default;
     bool need_local_exchange() const { return distribution_type != 
ExchangeType::NOOP; }
-    DataDistribution& operator=(const DataDistribution& other) {
-        distribution_type = other.distribution_type;
-        partition_exprs = other.partition_exprs;
-        return *this;
-    }
+    DataDistribution& operator=(const DataDistribution& other) = default;
     ExchangeType distribution_type;
     std::vector<TExpr> partition_exprs;
 };
@@ -843,13 +832,14 @@ public:
     LocalExchangeSharedState(int num_instances);
     std::unique_ptr<ExchangerBase> exchanger {};
     std::vector<MemTracker*> mem_trackers;
-    std::atomic<size_t> mem_usage = 0;
+    std::atomic<int64_t> mem_usage = 0;
+    // We need to make sure to add mem_usage first and then enqueue, otherwise 
sub mem_usage may cause negative mem_usage during concurrent dequeue.
     std::mutex le_lock;
     void create_source_dependencies(int operator_id, int node_id) {
-        for (size_t i = 0; i < source_deps.size(); i++) {
-            source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
-                                                          
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
-            source_deps[i]->set_shared_state(this);
+        for (auto& source_dep : source_deps) {
+            source_dep = std::make_shared<Dependency>(operator_id, node_id,
+                                                      
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
+            source_dep->set_shared_state(this);
         }
     };
     void sub_running_sink_operators();
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 51d2c8268e7..27b7fc7e7fd 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -74,20 +74,14 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
         return Status::OK();
     };
 
-    if (_running_sink_operators == 0) {
-        if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
-            SCOPED_TIMER(local_state._copy_data_timer);
-            mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
-                    block, partitioned_block.first->data_block);
-            RETURN_IF_ERROR(get_data(block));
-        } else {
-            *eos = true;
-        }
-    } else if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
         SCOPED_TIMER(local_state._copy_data_timer);
         mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
                 block, partitioned_block.first->data_block);
         RETURN_IF_ERROR(get_data(block));
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
@@ -144,6 +138,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                 if (data_queue[it.second].enqueue({new_block_wrapper, 
{row_idx, start, size}})) {
                     local_state._shared_state->set_ready_to_read(it.second);
                 } else {
+                    local_state._shared_state->sub_mem_usage(
+                            it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
                     new_block_wrapper->unref(local_state._shared_state);
                 }
             } else {
@@ -162,6 +158,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                             {new_block_wrapper, {row_idx, start, size}})) {
                     local_state._shared_state->set_ready_to_read(i % 
_num_sources);
                 } else {
+                    local_state._shared_state->sub_mem_usage(
+                            i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(),
+                            false);
                     new_block_wrapper->unref(local_state._shared_state);
                 }
             } else {
@@ -181,6 +180,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                 if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}})) {
                     local_state._shared_state->set_ready_to_read(map[i]);
                 } else {
+                    local_state._shared_state->sub_mem_usage(
+                            map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
                     new_block_wrapper->unref(local_state._shared_state);
                 }
             } else {
@@ -200,9 +201,12 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     }
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
-    local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
+    size_t memory_usage = new_block.allocated_bytes();
+    local_state._shared_state->add_mem_usage(channel_id, memory_usage);
     if (_data_queue[channel_id].enqueue(std::move(new_block))) {
         local_state._shared_state->set_ready_to_read(channel_id);
+    } else {
+        local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
     }
 
     return Status::OK();
@@ -220,25 +224,16 @@ void 
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            block->swap(next_block);
-            local_state._shared_state->sub_mem_usage(local_state._channel_id,
-                                                     block->allocated_bytes());
-            if (_free_block_limit == 0 ||
-                _free_blocks.size_approx() < _free_block_limit * _num_sources) 
{
-                _free_blocks.enqueue(std::move(next_block));
-            }
-        } else {
-            *eos = true;
-        }
-    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         block->swap(next_block);
+        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
-        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
@@ -264,14 +259,11 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         return Status::OK();
     }
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[0].try_dequeue(next_block)) {
-            *block = std::move(next_block);
-        } else {
-            *eos = true;
-        }
-    } else if (_data_queue[0].try_dequeue(next_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[0].try_dequeue(next_block)) {
         *block = std::move(next_block);
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
@@ -287,10 +279,14 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, 
vectorized::Block* in_
     }
     new_block.swap(*in_block);
     DCHECK_LE(local_state._channel_id, _data_queue.size());
-    add_mem_usage(local_state, new_block.allocated_bytes());
+
+    size_t memory_usage = new_block.allocated_bytes();
+    add_mem_usage(local_state, memory_usage);
 
     if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
         local_state._shared_state->set_ready_to_read(0);
+    } else {
+        sub_mem_usage(local_state, memory_usage);
     }
     if (eos) {
         _queue_deps[local_state._channel_id]->set_always_ready();
@@ -350,6 +346,19 @@ Status LocalMergeSortExchanger::get_block(RuntimeState* 
state, vectorized::Block
     return Status::OK();
 }
 
+void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSinkLocalState& 
local_state,
+                                            int64_t delta) {
+    const auto channel_id = local_state._channel_id;
+    local_state._shared_state->mem_trackers[channel_id]->release(delta);
+    if (_queues_mem_usege[channel_id].fetch_sub(delta) > _each_queue_limit) {
+        _sink_deps[channel_id]->set_ready();
+    }
+    // if queue empty , block this queue
+    if (_queues_mem_usege[channel_id] == 0) {
+        _queue_deps[channel_id]->block();
+    }
+}
+
 void LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState& 
local_state,
                                             int64_t delta) {
     const auto channel_id = local_state._channel_id;
@@ -412,14 +421,11 @@ void 
BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                      LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            *block = std::move(next_block);
-        } else {
-            *eos = true;
-        }
-    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         *block = std::move(next_block);
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
@@ -436,9 +442,12 @@ Status 
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
     }
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
-    local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
+    size_t memory_usage = new_block.allocated_bytes();
+    local_state._shared_state->add_mem_usage(channel_id, memory_usage);
     if (_data_queue[channel_id].enqueue(std::move(new_block))) {
         local_state._shared_state->set_ready_to_read(channel_id);
+    } else {
+        local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
     }
 
     return Status::OK();
@@ -494,9 +503,13 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
                     
vectorized::MutableBlock::create_unique(block->clone_empty());
             RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
             auto new_block = mutable_block->to_block();
-            local_state._shared_state->add_mem_usage(i, 
new_block.allocated_bytes());
+
+            size_t memory_usage = new_block.allocated_bytes();
+            local_state._shared_state->add_mem_usage(i, memory_usage);
             if (data_queue[i].enqueue(std::move(new_block))) {
                 local_state._shared_state->set_ready_to_read(i);
+            } else {
+                local_state._shared_state->sub_mem_usage(i, memory_usage);
             }
         }
     }
@@ -519,25 +532,16 @@ Status 
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
                                                bool* eos,
                                                LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            block->swap(next_block);
-            if (_free_block_limit == 0 ||
-                _free_blocks.size_approx() < _free_block_limit * _num_sources) 
{
-                _free_blocks.enqueue(std::move(next_block));
-            }
-            local_state._shared_state->sub_mem_usage(local_state._channel_id,
-                                                     block->allocated_bytes());
-        } else {
-            *eos = true;
-        }
-    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    bool all_finished = _running_sink_operators == 0;
+    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         block->swap(next_block);
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
         local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
+    } else if (all_finished) {
+        *eos = true;
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 741b86aa8bb..2c4f8f5b785 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -256,6 +256,7 @@ public:
     std::vector<Dependency*> local_state_dependency(int channel_id) override;
 
     void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t 
delta);
+    void sub_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t 
delta);
     void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int 
channel_id, int64_t delta);
     void close(LocalExchangeSourceLocalState& local_state) override {}
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to