This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 70da7856f4d [Improvement](local shuffle) Reduce locking scope in local exchanger … (#46293) 70da7856f4d is described below commit 70da7856f4db91b21f40053cefa7b9c4e9102e52 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Thu Jan 2 20:07:14 2025 +0800 [Improvement](local shuffle) Reduce locking scope in local exchanger … (#46293) …(#46251) Reduce lock scope from global level to data queue level. --- be/src/pipeline/local_exchange/local_exchanger.cpp | 4 +-- be/src/pipeline/local_exchange/local_exchanger.h | 42 ++++++++++------------ 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 824843d970c..fa34b6a4040 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -42,7 +42,7 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, block->ref(1); allocated_bytes = block->data_block.allocated_bytes(); } - std::unique_lock l(_m); + std::unique_lock l(*_m[channel_id]); local_state._shared_state->add_mem_usage(channel_id, allocated_bytes, !std::is_same_v<PartitionedBlock, BlockType> && !std::is_same_v<BroadcastBlock, BlockType>); @@ -90,7 +90,7 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st } else if (all_finished) { *eos = true; } else { - std::unique_lock l(_m); + std::unique_lock l(*_m[channel_id]); if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (std::is_same_v<PartitionedBlock, BlockType> || std::is_same_v<BroadcastBlock, BlockType>) { diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 274e7b404aa..9909161bd26 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -142,9 +142,20 @@ template <typename BlockType> class Exchanger : public ExchangerBase { public: Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {} + : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { + _data_queue.resize(num_partitions); + _m.resize(num_partitions); + for (size_t i = 0; i < num_partitions; i++) { + _m[i] = std::make_unique<std::mutex>(); + } + } Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { + _data_queue.resize(num_sources); + _m.resize(num_sources); + for (size_t i = 0; i < num_sources; i++) { + _m[i] = std::make_unique<std::mutex>(); + } } ~Exchanger() override = default; std::string data_queue_debug_string(int i) override { @@ -161,9 +172,7 @@ protected: bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id); std::vector<BlockQueue<BlockType>> _data_queue; - -private: - std::mutex _m; + std::vector<std::unique_ptr<std::mutex>> _m; }; class LocalExchangeSourceLocalState; @@ -217,7 +226,6 @@ public: free_block_limit) { DCHECK_GT(num_partitions, 0); DCHECK_GT(num_sources, 0); - _data_queue.resize(num_sources); } ~ShuffleExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, @@ -238,9 +246,7 @@ class BucketShuffleExchanger final : public ShuffleExchanger { BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, - free_block_limit) { - DCHECK_GT(num_partitions, 0); - } + free_block_limit) {} ~BucketShuffleExchanger() override = default; ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } }; @@ -250,9 +256,7 @@ public: ENABLE_FACTORY_CREATOR(PassthroughExchanger); PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) { - _data_queue.resize(num_partitions); - } + free_block_limit) {} ~PassthroughExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; @@ -268,9 +272,7 @@ public: ENABLE_FACTORY_CREATOR(PassToOneExchanger); PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) { - _data_queue.resize(num_partitions); - } + free_block_limit) {} ~PassToOneExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; @@ -287,9 +289,7 @@ public: LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source, int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit), - _sort_source(std::move(sort_source)) { - _data_queue.resize(num_partitions); - } + _sort_source(std::move(sort_source)) {} ~LocalMergeSortExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; @@ -313,9 +313,7 @@ class BroadcastExchanger final : public Exchanger<BroadcastBlock> { public: ENABLE_FACTORY_CREATOR(BroadcastExchanger); BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) { - _data_queue.resize(num_partitions); - } + : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) {} ~BroadcastExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; @@ -334,9 +332,7 @@ public: AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) { - _data_queue.resize(num_partitions); - } + free_block_limit) {} Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org