This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bd5844ea0d3 [Improvement](local exchange) optimize broadcast local exchanger (#39402) bd5844ea0d3 is described below commit bd5844ea0d37ddc8d31673e06d2e983a58f7429f Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Aug 19 11:50:37 2024 +0800 [Improvement](local exchange) optimize broadcast local exchanger (#39402) Currently, data blocks are sinked in sink operators and copied to multiple downstream source operators in broadcast local exchanger. This PR change it to copy-when-pull mode, which means source operators get this data block when it could. --- .../local_exchange_source_operator.cpp | 21 +++++++- .../local_exchange_source_operator.h | 2 + be/src/pipeline/local_exchange/local_exchanger.cpp | 56 +++++++++++++++------- be/src/pipeline/local_exchange/local_exchanger.h | 11 +++-- 4 files changed, 69 insertions(+), 21 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index 0cffe125a1f..2d20b8f365c 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -36,6 +36,18 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime"); } + if (_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT && _channel_id == 0) { + _local_merge_deps = _shared_state->get_dep_by_channel_id(_channel_id); + DCHECK_GT(_local_merge_deps.size(), 1); + _deps_counter.resize(_local_merge_deps.size()); + static const std::string timer_name = "WaitForDependencyTime"; + _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, timer_name, 1); + for (size_t i = 0; i < _deps_counter.size(); i++) { + _deps_counter[i] = _runtime_profile->add_nonzero_counter( + fmt::format("WaitForData{}", i), TUnit ::TIME_NS, timer_name, 1); + } + } + return Status::OK(); } @@ -44,6 +56,10 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) { return Status::OK(); } + for (size_t i = 0; i < _local_merge_deps.size(); i++) { + COUNTER_SET(_deps_counter[i], _local_merge_deps[i]->watcher_elapse_time()); + } + if (_exchanger) { _exchanger->close(*this); } @@ -51,6 +67,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) { _shared_state->sub_running_source_operators(*this); } + std::vector<DependencySPtr> {}.swap(_local_merge_deps); return Base::close(state); } @@ -60,9 +77,9 @@ std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const { // set dependencies ready std::vector<Dependency*> deps; auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id); - DCHECK_GT(le_deps.size(), 1); + DCHECK_GT(_local_merge_deps.size(), 1); // If this is a local merge exchange, we should use all dependencies here. - for (auto& dep : le_deps) { + for (auto& dep : _local_merge_deps) { deps.push_back(dep.get()); } return deps; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index f6c043d44e4..7bf92add63d 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -57,6 +57,8 @@ private: int _channel_id; RuntimeProfile::Counter* _get_block_failed_counter = nullptr; RuntimeProfile::Counter* _copy_data_timer = nullptr; + std::vector<RuntimeProfile::Counter*> _deps_counter; + std::vector<DependencySPtr> _local_merge_deps; }; class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceLocalState> { diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index e256419688e..e10da2beb72 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -35,7 +35,8 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, // PartitionedBlock will be push into multiple queues with different row ranges, so it will be // referenced multiple times. Otherwise, we only ref the block once because it is only push into // one queue. - if constexpr (std::is_same_v<PartitionedBlock, BlockType>) { + if constexpr (std::is_same_v<PartitionedBlock, BlockType> || + std::is_same_v<BroadcastBlock, BlockType>) { allocated_bytes = block.first->data_block.allocated_bytes(); } else { block->ref(1); @@ -50,7 +51,8 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes); // `enqueue(block)` return false iff this queue's source operator is already closed so we // just unref the block. - if constexpr (std::is_same_v<PartitionedBlock, BlockType>) { + if constexpr (std::is_same_v<PartitionedBlock, BlockType> || + std::is_same_v<BroadcastBlock, BlockType>) { block.first->unref(local_state._shared_state, allocated_bytes); } else { block->unref(local_state._shared_state, allocated_bytes); @@ -71,7 +73,8 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st int channel_id) { bool all_finished = _running_sink_operators == 0; if (_data_queue[channel_id].try_dequeue(block)) { - if constexpr (std::is_same_v<PartitionedBlock, BlockType>) { + if constexpr (std::is_same_v<PartitionedBlock, BlockType> || + std::is_same_v<BroadcastBlock, BlockType>) { local_state._shared_state->sub_mem_usage(channel_id, block.first->data_block.allocated_bytes()); } else { @@ -86,7 +89,8 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st } else { std::unique_lock l(_m); if (_data_queue[channel_id].try_dequeue(block)) { - if constexpr (std::is_same_v<PartitionedBlock, BlockType>) { + if constexpr (std::is_same_v<PartitionedBlock, BlockType> || + std::is_same_v<BroadcastBlock, BlockType>) { local_state._shared_state->sub_mem_usage(channel_id, block.first->data_block.allocated_bytes()); } else { @@ -135,7 +139,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block PartitionedBlock partitioned_block; vectorized::MutableBlock mutable_block; - auto get_data = [&](vectorized::Block* result_block) -> Status { + auto get_data = [&]() -> Status { do { const auto* offset_start = partitioned_block.second.row_idxs->data() + partitioned_block.second.offset_start; @@ -152,7 +156,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block SCOPED_TIMER(local_state._copy_data_timer); mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); - RETURN_IF_ERROR(get_data(block)); + RETURN_IF_ERROR(get_data()); } return Status::OK(); } @@ -374,30 +378,50 @@ Status LocalMergeSortExchanger::get_block(RuntimeState* state, vectorized::Block Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) { + if (in_block->empty()) { + return Status::OK(); + } + vectorized::Block new_block; + if (!_free_blocks.try_dequeue(new_block)) { + new_block = {in_block->clone_empty()}; + } + new_block.swap(*in_block); + auto wrapper = BlockWrapper::create_shared(std::move(new_block)); + local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes()); + wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { - auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty()); - RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows())); - _enqueue_data_and_set_ready(i, local_state, - BlockWrapper::create_shared(mutable_block->to_block())); + _enqueue_data_and_set_ready(i, local_state, {wrapper, {0, wrapper->data_block.rows()}}); } return Status::OK(); } void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { - vectorized::Block next_block; + BroadcastBlock partitioned_block; bool eos; - BlockWrapperSPtr wrapper; + vectorized::Block block; _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, wrapper, &eos, &next_block)) { - next_block = vectorized::Block(); + while (_dequeue_data(local_state, partitioned_block, &eos, &block)) { + partitioned_block.first->unref(local_state._shared_state); } } Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { - BlockWrapperSPtr next_block; - _dequeue_data(local_state, next_block, eos, block); + BroadcastBlock partitioned_block; + + if (_dequeue_data(local_state, partitioned_block, eos, block)) { + SCOPED_TIMER(local_state._copy_data_timer); + vectorized::MutableBlock mutable_block = + vectorized::VectorizedUtils::build_mutable_mem_reuse_block( + block, partitioned_block.first->data_block); + auto block_wrapper = partitioned_block.first; + RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, + partitioned_block.second.offset_start, + partitioned_block.second.length)); + block_wrapper->unref(local_state._shared_state); + } + return Status::OK(); } diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index dfb5c31fff8..71c388b2323 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -94,6 +94,12 @@ struct PartitionedRowIdxs { using PartitionedBlock = std::pair<std::shared_ptr<BlockWrapper>, PartitionedRowIdxs>; +struct RowRange { + uint32_t offset_start; + size_t length; +}; +using BroadcastBlock = std::pair<std::shared_ptr<BlockWrapper>, RowRange>; + template <typename BlockType> struct BlockQueue { std::atomic<bool> eos = false; @@ -304,12 +310,11 @@ private: std::vector<std::atomic_int64_t> _queues_mem_usege; }; -class BroadcastExchanger final : public Exchanger<BlockWrapperSPtr> { +class BroadcastExchanger final : public Exchanger<BroadcastBlock> { public: ENABLE_FACTORY_CREATOR(BroadcastExchanger); BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, - free_block_limit) { + : Exchanger<BroadcastBlock>(running_sink_operators, num_partitions, free_block_limit) { _data_queue.resize(num_partitions); } ~BroadcastExchanger() override = default; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org