This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 70da7856f4d [Improvement](local shuffle) Reduce locking scope in local 
exchanger … (#46293)
70da7856f4d is described below

commit 70da7856f4db91b21f40053cefa7b9c4e9102e52
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Thu Jan 2 20:07:14 2025 +0800

    [Improvement](local shuffle) Reduce locking scope in local exchanger … 
(#46293)
    
    …(#46251)
    
    Reduce lock scope from global level to data queue level.
---
 be/src/pipeline/local_exchange/local_exchanger.cpp |  4 +--
 be/src/pipeline/local_exchange/local_exchanger.h   | 42 ++++++++++------------
 2 files changed, 21 insertions(+), 25 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 824843d970c..fa34b6a4040 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -42,7 +42,7 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int 
channel_id,
         block->ref(1);
         allocated_bytes = block->data_block.allocated_bytes();
     }
-    std::unique_lock l(_m);
+    std::unique_lock l(*_m[channel_id]);
     local_state._shared_state->add_mem_usage(channel_id, allocated_bytes,
                                              !std::is_same_v<PartitionedBlock, 
BlockType> &&
                                                      
!std::is_same_v<BroadcastBlock, BlockType>);
@@ -90,7 +90,7 @@ bool 
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
     } else if (all_finished) {
         *eos = true;
     } else {
-        std::unique_lock l(_m);
+        std::unique_lock l(*_m[channel_id]);
         if (_data_queue[channel_id].try_dequeue(block)) {
             if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
                           std::is_same_v<BroadcastBlock, BlockType>) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 274e7b404aa..9909161bd26 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -142,9 +142,20 @@ template <typename BlockType>
 class Exchanger : public ExchangerBase {
 public:
     Exchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : ExchangerBase(running_sink_operators, num_partitions, 
free_block_limit) {}
+            : ExchangerBase(running_sink_operators, num_partitions, 
free_block_limit) {
+        _data_queue.resize(num_partitions);
+        _m.resize(num_partitions);
+        for (size_t i = 0; i < num_partitions; i++) {
+            _m[i] = std::make_unique<std::mutex>();
+        }
+    }
     Exchanger(int running_sink_operators, int num_sources, int num_partitions, 
int free_block_limit)
             : ExchangerBase(running_sink_operators, num_sources, 
num_partitions, free_block_limit) {
+        _data_queue.resize(num_sources);
+        _m.resize(num_sources);
+        for (size_t i = 0; i < num_sources; i++) {
+            _m[i] = std::make_unique<std::mutex>();
+        }
     }
     ~Exchanger() override = default;
     std::string data_queue_debug_string(int i) override {
@@ -161,9 +172,7 @@ protected:
     bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& 
block, bool* eos,
                        vectorized::Block* data_block, int channel_id);
     std::vector<BlockQueue<BlockType>> _data_queue;
-
-private:
-    std::mutex _m;
+    std::vector<std::unique_ptr<std::mutex>> _m;
 };
 
 class LocalExchangeSourceLocalState;
@@ -217,7 +226,6 @@ public:
                                           free_block_limit) {
         DCHECK_GT(num_partitions, 0);
         DCHECK_GT(num_sources, 0);
-        _data_queue.resize(num_sources);
     }
     ~ShuffleExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -238,9 +246,7 @@ class BucketShuffleExchanger final : public 
ShuffleExchanger {
     BucketShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
                            int free_block_limit)
             : ShuffleExchanger(running_sink_operators, num_sources, 
num_partitions,
-                               free_block_limit) {
-        DCHECK_GT(num_partitions, 0);
-    }
+                               free_block_limit) {}
     ~BucketShuffleExchanger() override = default;
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
 };
@@ -250,9 +256,7 @@ public:
     ENABLE_FACTORY_CREATOR(PassthroughExchanger);
     PassthroughExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
             : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
-                                          free_block_limit) {
-        _data_queue.resize(num_partitions);
-    }
+                                          free_block_limit) {}
     ~PassthroughExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
                 LocalExchangeSinkLocalState& local_state) override;
@@ -268,9 +272,7 @@ public:
     ENABLE_FACTORY_CREATOR(PassToOneExchanger);
     PassToOneExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
             : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
-                                          free_block_limit) {
-        _data_queue.resize(num_partitions);
-    }
+                                          free_block_limit) {}
     ~PassToOneExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
                 LocalExchangeSinkLocalState& local_state) override;
@@ -287,9 +289,7 @@ public:
     LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
                             int running_sink_operators, int num_partitions, 
int free_block_limit)
             : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions, free_block_limit),
-              _sort_source(std::move(sort_source)) {
-        _data_queue.resize(num_partitions);
-    }
+              _sort_source(std::move(sort_source)) {}
     ~LocalMergeSortExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
                 LocalExchangeSinkLocalState& local_state) override;
@@ -313,9 +313,7 @@ class BroadcastExchanger final : public 
Exchanger<BroadcastBlock> {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastExchanger);
     BroadcastExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<BroadcastBlock>(running_sink_operators, 
num_partitions, free_block_limit) {
-        _data_queue.resize(num_partitions);
-    }
+            : Exchanger<BroadcastBlock>(running_sink_operators, 
num_partitions, free_block_limit) {}
     ~BroadcastExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
                 LocalExchangeSinkLocalState& local_state) override;
@@ -334,9 +332,7 @@ public:
     AdaptivePassthroughExchanger(int running_sink_operators, int 
num_partitions,
                                  int free_block_limit)
             : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
-                                          free_block_limit) {
-        _data_queue.resize(num_partitions);
-    }
+                                          free_block_limit) {}
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
                 LocalExchangeSinkLocalState& local_state) override;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to