This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bd5844ea0d3 [Improvement](local exchange) optimize broadcast local 
exchanger (#39402)
bd5844ea0d3 is described below

commit bd5844ea0d37ddc8d31673e06d2e983a58f7429f
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Aug 19 11:50:37 2024 +0800

    [Improvement](local exchange) optimize broadcast local exchanger (#39402)
    
    Currently, data blocks are sinked in sink operators and copied to
    multiple downstream source operators in broadcast local exchanger. This
    PR change it to copy-when-pull mode, which means source operators get
    this data block when it could.
---
 .../local_exchange_source_operator.cpp             | 21 +++++++-
 .../local_exchange_source_operator.h               |  2 +
 be/src/pipeline/local_exchange/local_exchanger.cpp | 56 +++++++++++++++-------
 be/src/pipeline/local_exchange/local_exchanger.h   | 11 +++--
 4 files changed, 69 insertions(+), 21 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 0cffe125a1f..2d20b8f365c 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -36,6 +36,18 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
         _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
     }
 
+    if (_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT && 
_channel_id == 0) {
+        _local_merge_deps = _shared_state->get_dep_by_channel_id(_channel_id);
+        DCHECK_GT(_local_merge_deps.size(), 1);
+        _deps_counter.resize(_local_merge_deps.size());
+        static const std::string timer_name = "WaitForDependencyTime";
+        _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
timer_name, 1);
+        for (size_t i = 0; i < _deps_counter.size(); i++) {
+            _deps_counter[i] = _runtime_profile->add_nonzero_counter(
+                    fmt::format("WaitForData{}", i), TUnit ::TIME_NS, 
timer_name, 1);
+        }
+    }
+
     return Status::OK();
 }
 
@@ -44,6 +56,10 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
         return Status::OK();
     }
 
+    for (size_t i = 0; i < _local_merge_deps.size(); i++) {
+        COUNTER_SET(_deps_counter[i], 
_local_merge_deps[i]->watcher_elapse_time());
+    }
+
     if (_exchanger) {
         _exchanger->close(*this);
     }
@@ -51,6 +67,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
         _shared_state->sub_running_source_operators(*this);
     }
 
+    std::vector<DependencySPtr> {}.swap(_local_merge_deps);
     return Base::close(state);
 }
 
@@ -60,9 +77,9 @@ std::vector<Dependency*> 
LocalExchangeSourceLocalState::dependencies() const {
         // set dependencies ready
         std::vector<Dependency*> deps;
         auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
-        DCHECK_GT(le_deps.size(), 1);
+        DCHECK_GT(_local_merge_deps.size(), 1);
         // If this is a local merge exchange, we should use all dependencies 
here.
-        for (auto& dep : le_deps) {
+        for (auto& dep : _local_merge_deps) {
             deps.push_back(dep.get());
         }
         return deps;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index f6c043d44e4..7bf92add63d 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -57,6 +57,8 @@ private:
     int _channel_id;
     RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
     RuntimeProfile::Counter* _copy_data_timer = nullptr;
+    std::vector<RuntimeProfile::Counter*> _deps_counter;
+    std::vector<DependencySPtr> _local_merge_deps;
 };
 
 class LocalExchangeSourceOperatorX final : public 
OperatorX<LocalExchangeSourceLocalState> {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index e256419688e..e10da2beb72 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -35,7 +35,8 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int 
channel_id,
     // PartitionedBlock will be push into multiple queues with different row 
ranges, so it will be
     // referenced multiple times. Otherwise, we only ref the block once 
because it is only push into
     // one queue.
-    if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+    if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
+                  std::is_same_v<BroadcastBlock, BlockType>) {
         allocated_bytes = block.first->data_block.allocated_bytes();
     } else {
         block->ref(1);
@@ -50,7 +51,8 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int 
channel_id,
         local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes);
         // `enqueue(block)` return false iff this queue's source operator is 
already closed so we
         // just unref the block.
-        if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+        if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
+                      std::is_same_v<BroadcastBlock, BlockType>) {
             block.first->unref(local_state._shared_state, allocated_bytes);
         } else {
             block->unref(local_state._shared_state, allocated_bytes);
@@ -71,7 +73,8 @@ bool 
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
                                          int channel_id) {
     bool all_finished = _running_sink_operators == 0;
     if (_data_queue[channel_id].try_dequeue(block)) {
-        if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+        if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
+                      std::is_same_v<BroadcastBlock, BlockType>) {
             local_state._shared_state->sub_mem_usage(channel_id,
                                                      
block.first->data_block.allocated_bytes());
         } else {
@@ -86,7 +89,8 @@ bool 
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
     } else {
         std::unique_lock l(_m);
         if (_data_queue[channel_id].try_dequeue(block)) {
-            if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+            if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
+                          std::is_same_v<BroadcastBlock, BlockType>) {
                 local_state._shared_state->sub_mem_usage(channel_id,
                                                          
block.first->data_block.allocated_bytes());
             } else {
@@ -135,7 +139,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
     PartitionedBlock partitioned_block;
     vectorized::MutableBlock mutable_block;
 
-    auto get_data = [&](vectorized::Block* result_block) -> Status {
+    auto get_data = [&]() -> Status {
         do {
             const auto* offset_start = 
partitioned_block.second.row_idxs->data() +
                                        partitioned_block.second.offset_start;
@@ -152,7 +156,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
         SCOPED_TIMER(local_state._copy_data_timer);
         mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
                 block, partitioned_block.first->data_block);
-        RETURN_IF_ERROR(get_data(block));
+        RETURN_IF_ERROR(get_data());
     }
     return Status::OK();
 }
@@ -374,30 +378,50 @@ Status LocalMergeSortExchanger::get_block(RuntimeState* 
state, vectorized::Block
 
 Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
                                 LocalExchangeSinkLocalState& local_state) {
+    if (in_block->empty()) {
+        return Status::OK();
+    }
+    vectorized::Block new_block;
+    if (!_free_blocks.try_dequeue(new_block)) {
+        new_block = {in_block->clone_empty()};
+    }
+    new_block.swap(*in_block);
+    auto wrapper = BlockWrapper::create_shared(std::move(new_block));
+    
local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes());
+    wrapper->ref(_num_partitions);
     for (size_t i = 0; i < _num_partitions; i++) {
-        auto mutable_block = 
vectorized::MutableBlock::create_unique(in_block->clone_empty());
-        RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, 
in_block->rows()));
-        _enqueue_data_and_set_ready(i, local_state,
-                                    
BlockWrapper::create_shared(mutable_block->to_block()));
+        _enqueue_data_and_set_ready(i, local_state, {wrapper, {0, 
wrapper->data_block.rows()}});
     }
 
     return Status::OK();
 }
 
 void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
-    vectorized::Block next_block;
+    BroadcastBlock partitioned_block;
     bool eos;
-    BlockWrapperSPtr wrapper;
+    vectorized::Block block;
     _data_queue[local_state._channel_id].set_eos();
-    while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
-        next_block = vectorized::Block();
+    while (_dequeue_data(local_state, partitioned_block, &eos, &block)) {
+        partitioned_block.first->unref(local_state._shared_state);
     }
 }
 
 Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                      LocalExchangeSourceLocalState& 
local_state) {
-    BlockWrapperSPtr next_block;
-    _dequeue_data(local_state, next_block, eos, block);
+    BroadcastBlock partitioned_block;
+
+    if (_dequeue_data(local_state, partitioned_block, eos, block)) {
+        SCOPED_TIMER(local_state._copy_data_timer);
+        vectorized::MutableBlock mutable_block =
+                vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+                        block, partitioned_block.first->data_block);
+        auto block_wrapper = partitioned_block.first;
+        RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block,
+                                               
partitioned_block.second.offset_start,
+                                               
partitioned_block.second.length));
+        block_wrapper->unref(local_state._shared_state);
+    }
+
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index dfb5c31fff8..71c388b2323 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -94,6 +94,12 @@ struct PartitionedRowIdxs {
 
 using PartitionedBlock = std::pair<std::shared_ptr<BlockWrapper>, 
PartitionedRowIdxs>;
 
+struct RowRange {
+    uint32_t offset_start;
+    size_t length;
+};
+using BroadcastBlock = std::pair<std::shared_ptr<BlockWrapper>, RowRange>;
+
 template <typename BlockType>
 struct BlockQueue {
     std::atomic<bool> eos = false;
@@ -304,12 +310,11 @@ private:
     std::vector<std::atomic_int64_t> _queues_mem_usege;
 };
 
-class BroadcastExchanger final : public Exchanger<BlockWrapperSPtr> {
+class BroadcastExchanger final : public Exchanger<BroadcastBlock> {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastExchanger);
     BroadcastExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
-                                          free_block_limit) {
+            : Exchanger<BroadcastBlock>(running_sink_operators, 
num_partitions, free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~BroadcastExchanger() override = default;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to