This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ff090ff641c [refactor](shuffle) (PART II) Split local exchange 
operators from loc… (#45280)
ff090ff641c is described below

commit ff090ff641ce8e96c3db918de84875632f453623
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Wed Dec 11 17:55:56 2024 +0800

    [refactor](shuffle) (PART II) Split local exchange operators from loc… 
(#45280)
    
    …al exchanger
    
    This is a PR which wants to decouple the local exchanger and local
    exchange operators.
    
    Since all type of exchangers are similar as exchange sink does, I plan
    to use exchanger to split data in both local exchange operators and
    exchange sink operators.
---
 be/src/pipeline/dependency.cpp                     |   5 +-
 be/src/pipeline/dependency.h                       |   2 +-
 .../local_exchange_sink_operator.cpp               |   5 +-
 .../local_exchange/local_exchange_sink_operator.h  |   1 -
 .../local_exchange_source_operator.cpp             |   8 +-
 be/src/pipeline/local_exchange/local_exchanger.cpp | 367 +++++++++++++--------
 be/src/pipeline/local_exchange/local_exchanger.h   | 127 ++++---
 7 files changed, 325 insertions(+), 190 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 5fef018423d..dcf5c7a0a81 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -179,12 +179,11 @@ void 
LocalExchangeSharedState::sub_running_sink_operators() {
     }
 }
 
-void LocalExchangeSharedState::sub_running_source_operators(
-        LocalExchangeSourceLocalState& local_state) {
+void LocalExchangeSharedState::sub_running_source_operators() {
     std::unique_lock<std::mutex> lc(le_lock);
     if (exchanger->_running_source_operators.fetch_sub(1) == 1) {
         _set_always_ready();
-        exchanger->finalize(local_state);
+        exchanger->finalize();
     }
 }
 
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index ad018c8b4f8..f1cfe2b0297 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -758,7 +758,7 @@ public:
         }
     }
     void sub_running_sink_operators();
-    void sub_running_source_operators(LocalExchangeSourceLocalState& 
local_state);
+    void sub_running_source_operators();
     void _set_always_ready() {
         for (auto& dep : source_deps) {
             DCHECK(dep);
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 22007a4b220..b22ee9fd77e 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -144,7 +144,10 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
-    RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, 
local_state));
+    RETURN_IF_ERROR(local_state._exchanger->sink(
+            state, in_block, eos,
+            {local_state._compute_hash_value_timer, 
local_state._distribute_timer, nullptr},
+            {&local_state._channel_id, local_state._partitioner.get(), 
&local_state}));
 
     // If all exchange sources ended due to limit reached, current task should 
also finish
     if (local_state._exchanger->_running_source_operators == 0) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 435f7a410a4..c067f023c8d 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -65,7 +65,6 @@ private:
     RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
     RuntimeProfile::Counter* _distribute_timer = nullptr;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
-    std::vector<uint32_t> _partition_rows_histogram;
 
     // Used by random passthrough exchanger
     int _channel_id = 0;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index c4832b9958c..63e36cdfdb0 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -61,10 +61,10 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
     }
 
     if (_exchanger) {
-        _exchanger->close(*this);
+        _exchanger->close({_channel_id, this});
     }
     if (_shared_state) {
-        _shared_state->sub_running_source_operators(*this);
+        _shared_state->sub_running_source_operators();
     }
 
     std::vector<DependencySPtr> {}.swap(_local_merge_deps);
@@ -116,7 +116,9 @@ Status 
LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::
                                                bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    RETURN_IF_ERROR(local_state._exchanger->get_block(state, block, eos, 
local_state));
+    RETURN_IF_ERROR(local_state._exchanger->get_block(
+            state, block, eos, {nullptr, nullptr, 
local_state._copy_data_timer},
+            {local_state._channel_id, &local_state}));
     local_state.reached_limit(block, eos);
     return Status::OK();
 }
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 647988f8b79..a963de8b684 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -29,8 +29,12 @@ namespace doris::pipeline {
 #include "common/compile_check_begin.h"
 template <typename BlockType>
 void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
-                                                       
LocalExchangeSinkLocalState& local_state,
+                                                       
LocalExchangeSinkLocalState* local_state,
                                                        BlockType&& block) {
+    if (local_state == nullptr) {
+        _enqueue_data_and_set_ready(channel_id, std::move(block));
+        return;
+    }
     size_t allocated_bytes = 0;
     // PartitionedBlock is used by shuffle exchanger.
     // PartitionedBlock will be push into multiple queues with different row 
ranges, so it will be
@@ -44,47 +48,47 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int 
channel_id,
         allocated_bytes = block->data_block.allocated_bytes();
     }
     std::unique_lock l(_m);
-    local_state._shared_state->add_mem_usage(channel_id, allocated_bytes,
-                                             !std::is_same_v<PartitionedBlock, 
BlockType> &&
-                                                     
!std::is_same_v<BroadcastBlock, BlockType>);
+    local_state->_shared_state->add_mem_usage(channel_id, allocated_bytes,
+                                              
!std::is_same_v<PartitionedBlock, BlockType> &&
+                                                      
!std::is_same_v<BroadcastBlock, BlockType>);
     if (_data_queue[channel_id].enqueue(std::move(block))) {
-        local_state._shared_state->set_ready_to_read(channel_id);
+        local_state->_shared_state->set_ready_to_read(channel_id);
     } else {
-        local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes);
+        local_state->_shared_state->sub_mem_usage(channel_id, allocated_bytes);
         // `enqueue(block)` return false iff this queue's source operator is 
already closed so we
         // just unref the block.
         if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
                       std::is_same_v<BroadcastBlock, BlockType>) {
-            block.first->unref(local_state._shared_state, allocated_bytes, 
channel_id);
+            block.first->unref(local_state->_shared_state, allocated_bytes, 
channel_id);
         } else {
-            block->unref(local_state._shared_state, allocated_bytes, 
channel_id);
+            block->unref(local_state->_shared_state, allocated_bytes, 
channel_id);
             DCHECK_EQ(block->ref_value(), 0);
         }
     }
 }
 
 template <typename BlockType>
-bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& 
local_state,
-                                         BlockType& block, bool* eos,
-                                         vectorized::Block* data_block) {
-    return _dequeue_data(local_state, block, eos, data_block, 
local_state._channel_id);
-}
-
-template <typename BlockType>
-bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& 
local_state,
+bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* 
local_state,
                                          BlockType& block, bool* eos, 
vectorized::Block* data_block,
                                          int channel_id) {
+    if (local_state == nullptr) {
+        if (!_dequeue_data(block, eos, data_block, channel_id)) {
+            throw Exception(ErrorCode::INTERNAL_ERROR, "Exchanger has no data: 
{}",
+                            data_queue_debug_string(channel_id));
+        }
+        return true;
+    }
     bool all_finished = _running_sink_operators == 0;
     if (_data_queue[channel_id].try_dequeue(block)) {
         if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
                       std::is_same_v<BroadcastBlock, BlockType>) {
-            local_state._shared_state->sub_mem_usage(channel_id,
-                                                     
block.first->data_block.allocated_bytes());
+            local_state->_shared_state->sub_mem_usage(channel_id,
+                                                      
block.first->data_block.allocated_bytes());
         } else {
-            local_state._shared_state->sub_mem_usage(channel_id,
-                                                     
block->data_block.allocated_bytes());
+            local_state->_shared_state->sub_mem_usage(channel_id,
+                                                      
block->data_block.allocated_bytes());
             data_block->swap(block->data_block);
-            block->unref(local_state._shared_state, 
data_block->allocated_bytes(), channel_id);
+            block->unref(local_state->_shared_state, 
data_block->allocated_bytes(), channel_id);
             DCHECK_EQ(block->ref_value(), 0);
         }
         return true;
@@ -95,54 +99,88 @@ bool 
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
         if (_data_queue[channel_id].try_dequeue(block)) {
             if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
                           std::is_same_v<BroadcastBlock, BlockType>) {
-                local_state._shared_state->sub_mem_usage(channel_id,
-                                                         
block.first->data_block.allocated_bytes());
+                local_state->_shared_state->sub_mem_usage(
+                        channel_id, block.first->data_block.allocated_bytes());
             } else {
-                local_state._shared_state->sub_mem_usage(channel_id,
-                                                         
block->data_block.allocated_bytes());
+                local_state->_shared_state->sub_mem_usage(channel_id,
+                                                          
block->data_block.allocated_bytes());
                 data_block->swap(block->data_block);
-                block->unref(local_state._shared_state, 
data_block->allocated_bytes(), channel_id);
+                block->unref(local_state->_shared_state, 
data_block->allocated_bytes(), channel_id);
                 DCHECK_EQ(block->ref_value(), 0);
             }
             return true;
         }
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        local_state._dependency->block();
+        COUNTER_UPDATE(local_state->_get_block_failed_counter, 1);
+        local_state->_dependency->block();
+    }
+    return false;
+}
+
+template <typename BlockType>
+void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, 
BlockType&& block) {
+    if constexpr (!std::is_same_v<PartitionedBlock, BlockType> &&
+                  !std::is_same_v<BroadcastBlock, BlockType>) {
+        block->ref(1);
+    }
+    if (!_data_queue[channel_id].enqueue(std::move(block))) {
+        if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
+                      std::is_same_v<BroadcastBlock, BlockType>) {
+            block.first->unref();
+        } else {
+            block->unref();
+            DCHECK_EQ(block->ref_value(), 0);
+        }
+    }
+}
+
+template <typename BlockType>
+bool Exchanger<BlockType>::_dequeue_data(BlockType& block, bool* eos, 
vectorized::Block* data_block,
+                                         int channel_id) {
+    if (_data_queue[channel_id].try_dequeue(block)) {
+        if constexpr (!std::is_same_v<PartitionedBlock, BlockType> &&
+                      !std::is_same_v<BroadcastBlock, BlockType>) {
+            data_block->swap(block->data_block);
+            block->unref();
+            DCHECK_EQ(block->ref_value(), 0);
+        }
+        return true;
     }
     return false;
 }
 
 Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
-                              LocalExchangeSinkLocalState& local_state) {
+                              Profile&& profile, SinkInfo&& sink_info) {
     if (in_block->empty()) {
         return Status::OK();
     }
     {
-        SCOPED_TIMER(local_state._compute_hash_value_timer);
-        RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
in_block));
+        SCOPED_TIMER(profile.compute_hash_value_timer);
+        RETURN_IF_ERROR(sink_info.partitioner->do_partitioning(state, 
in_block));
     }
     {
-        SCOPED_TIMER(local_state._distribute_timer);
-        RETURN_IF_ERROR(_split_rows(state,
-                                    
local_state._partitioner->get_channel_ids().get<uint32_t>(),
-                                    in_block, local_state));
+        SCOPED_TIMER(profile.distribute_timer);
+        RETURN_IF_ERROR(_split_rows(state, 
sink_info.partitioner->get_channel_ids().get<uint32_t>(),
+                                    in_block, *sink_info.channel_id, 
sink_info.local_state));
     }
 
     return Status::OK();
 }
 
-void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
+void ShuffleExchanger::close(SourceInfo&& source_info) {
     PartitionedBlock partitioned_block;
     bool eos;
     vectorized::Block block;
-    _data_queue[local_state._channel_id].set_eos();
-    while (_dequeue_data(local_state, partitioned_block, &eos, &block)) {
-        partitioned_block.first->unref(local_state._shared_state, 
local_state._channel_id);
+    _data_queue[source_info.channel_id].set_eos();
+    while (_dequeue_data(source_info.local_state, partitioned_block, &eos, 
&block,
+                         source_info.channel_id)) {
+        partitioned_block.first->unref(
+                source_info.local_state ? 
source_info.local_state->_shared_state : nullptr,
+                source_info.channel_id);
     }
 }
 
 Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
-                                   LocalExchangeSourceLocalState& local_state) 
{
+                                   Profile&& profile, SourceInfo&& 
source_info) {
     PartitionedBlock partitioned_block;
     vectorized::MutableBlock mutable_block;
 
@@ -153,14 +191,18 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
             auto block_wrapper = partitioned_block.first;
             RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, 
offset_start,
                                                    offset_start + 
partitioned_block.second.length));
-            block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
+            block_wrapper->unref(
+                    source_info.local_state ? 
source_info.local_state->_shared_state : nullptr,
+                    source_info.channel_id);
         } while (mutable_block.rows() < state->batch_size() && !*eos &&
-                 _dequeue_data(local_state, partitioned_block, eos, block));
+                 _dequeue_data(source_info.local_state, partitioned_block, 
eos, block,
+                               source_info.channel_id));
         return Status::OK();
     };
 
-    if (_dequeue_data(local_state, partitioned_block, eos, block)) {
-        SCOPED_TIMER(local_state._copy_data_timer);
+    if (_dequeue_data(source_info.local_state, partitioned_block, eos, block,
+                      source_info.channel_id)) {
+        SCOPED_TIMER(profile.copy_data_timer);
         mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
                 block, partitioned_block.first->data_block);
         RETURN_IF_ERROR(get_data());
@@ -169,22 +211,25 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
 }
 
 Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* 
__restrict channel_ids,
-                                     vectorized::Block* block,
-                                     LocalExchangeSinkLocalState& local_state) 
{
+                                     vectorized::Block* block, int channel_id,
+                                     LocalExchangeSinkLocalState* local_state) 
{
+    if (local_state == nullptr) {
+        return _split_rows(state, channel_ids, block, channel_id);
+    }
     const auto rows = cast_set<int32_t>(block->rows());
     auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
+    auto& partition_rows_histogram = _partition_rows_histogram[channel_id];
     {
-        local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
+        partition_rows_histogram.assign(_num_partitions + 1, 0);
         for (int32_t i = 0; i < rows; ++i) {
-            local_state._partition_rows_histogram[channel_ids[i]]++;
+            partition_rows_histogram[channel_ids[i]]++;
         }
         for (int32_t i = 1; i <= _num_partitions; ++i) {
-            local_state._partition_rows_histogram[i] +=
-                    local_state._partition_rows_histogram[i - 1];
+            partition_rows_histogram[i] += partition_rows_histogram[i - 1];
         }
         for (int32_t i = rows - 1; i >= 0; --i) {
-            (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 
1] = i;
-            local_state._partition_rows_histogram[channel_ids[i]]--;
+            (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i;
+            partition_rows_histogram[channel_ids[i]]--;
         }
     }
 
@@ -200,10 +245,10 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
     if (new_block_wrapper->data_block.empty()) {
         return Status::OK();
     }
-    
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(),
-                                                   local_state._channel_id);
+    
local_state->_shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(),
+                                                    channel_id);
     auto bucket_seq_to_instance_idx =
-            
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
+            
local_state->_parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
     if (get_type() == ExchangeType::HASH_SHUFFLE) {
         /**
          * If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed 
to all instances of
@@ -211,32 +256,32 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
          * For example, row 1 get a hash value 1 which means we should 
distribute to instance 1 on
          * BE 1 and row 2 get a hash value 2 which means we should distribute 
to instance 1 on BE 3.
          */
-        const auto& map = 
local_state._parent->cast<LocalExchangeSinkOperatorX>()
+        const auto& map = 
local_state->_parent->cast<LocalExchangeSinkOperatorX>()
                                   ._shuffle_idx_to_instance_idx;
         new_block_wrapper->ref(cast_set<int>(map.size()));
         for (const auto& it : map) {
             DCHECK(it.second >= 0 && it.second < _num_partitions)
                     << it.first << " : " << it.second << " " << 
_num_partitions;
-            uint32_t start = local_state._partition_rows_histogram[it.first];
-            uint32_t size = local_state._partition_rows_histogram[it.first + 
1] - start;
+            uint32_t start = partition_rows_histogram[it.first];
+            uint32_t size = partition_rows_histogram[it.first + 1] - start;
             if (size > 0) {
                 _enqueue_data_and_set_ready(it.second, local_state,
                                             {new_block_wrapper, {row_idx, 
start, size}});
             } else {
-                new_block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
+                new_block_wrapper->unref(local_state->_shared_state, 
channel_id);
             }
         }
     } else {
         DCHECK(!bucket_seq_to_instance_idx.empty());
         new_block_wrapper->ref(_num_partitions);
         for (int i = 0; i < _num_partitions; i++) {
-            uint32_t start = local_state._partition_rows_histogram[i];
-            uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
+            uint32_t start = partition_rows_histogram[i];
+            uint32_t size = partition_rows_histogram[i + 1] - start;
             if (size > 0) {
                 _enqueue_data_and_set_ready(bucket_seq_to_instance_idx[i], 
local_state,
                                             {new_block_wrapper, {row_idx, 
start, size}});
             } else {
-                new_block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
+                new_block_wrapper->unref(local_state->_shared_state, 
channel_id);
             }
         }
     }
@@ -244,8 +289,53 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
     return Status::OK();
 }
 
+Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* 
__restrict channel_ids,
+                                     vectorized::Block* block, int channel_id) 
{
+    const auto rows = cast_set<int32_t>(block->rows());
+    auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
+    auto& partition_rows_histogram = _partition_rows_histogram[channel_id];
+    {
+        partition_rows_histogram.assign(_num_partitions + 1, 0);
+        for (int32_t i = 0; i < rows; ++i) {
+            partition_rows_histogram[channel_ids[i]]++;
+        }
+        for (int32_t i = 1; i <= _num_partitions; ++i) {
+            partition_rows_histogram[i] += partition_rows_histogram[i - 1];
+        }
+        for (int32_t i = rows - 1; i >= 0; --i) {
+            (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i;
+            partition_rows_histogram[channel_ids[i]]--;
+        }
+    }
+
+    vectorized::Block data_block;
+    std::shared_ptr<BlockWrapper> new_block_wrapper;
+    if (_free_blocks.try_dequeue(data_block)) {
+        new_block_wrapper = BlockWrapper::create_shared(std::move(data_block));
+    } else {
+        new_block_wrapper = BlockWrapper::create_shared(block->clone_empty());
+    }
+
+    new_block_wrapper->data_block.swap(*block);
+    if (new_block_wrapper->data_block.empty()) {
+        return Status::OK();
+    }
+    new_block_wrapper->ref(cast_set<int>(_num_partitions));
+    for (int i = 0; i < _num_partitions; i++) {
+        uint32_t start = partition_rows_histogram[i];
+        uint32_t size = partition_rows_histogram[i + 1] - start;
+        if (size > 0) {
+            _enqueue_data_and_set_ready(i, {new_block_wrapper, {row_idx, 
start, size}});
+        } else {
+            new_block_wrapper->unref();
+        }
+    }
+
+    return Status::OK();
+}
+
 Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
-                                  LocalExchangeSinkLocalState& local_state) {
+                                  Profile&& profile, SinkInfo&& sink_info) {
     if (in_block->empty()) {
         return Status::OK();
     }
@@ -256,41 +346,43 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     }
     new_block.swap(*in_block);
     wrapper = BlockWrapper::create_shared(std::move(new_block));
-    auto channel_id = (local_state._channel_id++) % _num_partitions;
-    _enqueue_data_and_set_ready(channel_id, local_state, std::move(wrapper));
+    auto channel_id = ((*sink_info.channel_id)++) % _num_partitions;
+    _enqueue_data_and_set_ready(channel_id, sink_info.local_state, 
std::move(wrapper));
 
     return Status::OK();
 }
 
-void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
+void PassthroughExchanger::close(SourceInfo&& source_info) {
     vectorized::Block next_block;
     BlockWrapperSPtr wrapper;
     bool eos;
-    _data_queue[local_state._channel_id].set_eos();
-    while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
+    _data_queue[source_info.channel_id].set_eos();
+    while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block,
+                         source_info.channel_id)) {
         // do nothing
     }
 }
 
-void PassToOneExchanger::close(LocalExchangeSourceLocalState& local_state) {
+void PassToOneExchanger::close(SourceInfo&& source_info) {
     vectorized::Block next_block;
     BlockWrapperSPtr wrapper;
     bool eos;
-    _data_queue[local_state._channel_id].set_eos();
-    while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
+    _data_queue[source_info.channel_id].set_eos();
+    while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block,
+                         source_info.channel_id)) {
         // do nothing
     }
 }
 
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
-                                       LocalExchangeSourceLocalState& 
local_state) {
+                                       Profile&& profile, SourceInfo&& 
source_info) {
     BlockWrapperSPtr next_block;
-    _dequeue_data(local_state, next_block, eos, block);
+    _dequeue_data(source_info.local_state, next_block, eos, block, 
source_info.channel_id);
     return Status::OK();
 }
 
 Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
-                                LocalExchangeSinkLocalState& local_state) {
+                                Profile&& profile, SinkInfo&& sink_info) {
     if (in_block->empty()) {
         return Status::OK();
     }
@@ -301,70 +393,72 @@ Status PassToOneExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     new_block.swap(*in_block);
 
     BlockWrapperSPtr wrapper = 
BlockWrapper::create_shared(std::move(new_block));
-    _enqueue_data_and_set_ready(0, local_state, std::move(wrapper));
+    _enqueue_data_and_set_ready(0, sink_info.local_state, std::move(wrapper));
 
     return Status::OK();
 }
 
 Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
-                                     LocalExchangeSourceLocalState& 
local_state) {
-    if (local_state._channel_id != 0) {
+                                     Profile&& profile, SourceInfo&& 
source_info) {
+    if (source_info.channel_id != 0) {
         *eos = true;
         return Status::OK();
     }
     BlockWrapperSPtr next_block;
-    _dequeue_data(local_state, next_block, eos, block);
+    _dequeue_data(source_info.local_state, next_block, eos, block, 
source_info.channel_id);
     return Status::OK();
 }
 
 Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
-                                     LocalExchangeSinkLocalState& local_state) 
{
+                                     Profile&& profile, SinkInfo&& sink_info) {
     if (!in_block->empty()) {
         vectorized::Block new_block;
         if (!_free_blocks.try_dequeue(new_block)) {
             new_block = {in_block->clone_empty()};
         }
-        DCHECK_LE(local_state._channel_id, _data_queue.size());
+        DCHECK_LE(*sink_info.channel_id, _data_queue.size());
 
         new_block.swap(*in_block);
-        _enqueue_data_and_set_ready(local_state._channel_id, local_state,
+        _enqueue_data_and_set_ready(*sink_info.channel_id, 
sink_info.local_state,
                                     
BlockWrapper::create_shared(std::move(new_block)));
     }
-    if (eos) {
-        
local_state._shared_state->source_deps[local_state._channel_id]->set_always_ready();
+    if (eos && sink_info.local_state) {
+        
sink_info.local_state->_shared_state->source_deps[*sink_info.channel_id]
+                ->set_always_ready();
     }
     return Status::OK();
 }
 
-void ExchangerBase::finalize(LocalExchangeSourceLocalState& local_state) {
+void ExchangerBase::finalize() {
     DCHECK(_running_source_operators == 0);
     vectorized::Block block;
     while (_free_blocks.try_dequeue(block)) {
         // do nothing
     }
 }
-void LocalMergeSortExchanger::finalize(LocalExchangeSourceLocalState& 
local_state) {
+
+void LocalMergeSortExchanger::finalize() {
     BlockWrapperSPtr next_block;
     vectorized::Block block;
     bool eos;
     int id = 0;
     for (auto& data_queue : _data_queue) {
         data_queue.set_eos();
-        while (_dequeue_data(local_state, next_block, &eos, &block, id)) {
+        while (_dequeue_data(next_block, &eos, &block, id)) {
             block = vectorized::Block();
         }
         id++;
     }
-    ExchangerBase::finalize(local_state);
+    ExchangerBase::finalize();
 }
 
 Status LocalMergeSortExchanger::build_merger(RuntimeState* state,
-                                             LocalExchangeSourceLocalState& 
local_state) {
-    RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, 
local_state.profile()));
+                                             LocalExchangeSourceLocalState* 
local_state) {
+    RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, 
local_state->profile()));
     std::vector<vectorized::BlockSupplier> child_block_suppliers;
     for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
-        vectorized::BlockSupplier block_supplier = [&, id = 
channel_id](vectorized::Block* block,
-                                                                        bool* 
eos) {
+        vectorized::BlockSupplier block_supplier = [&, local_state, id = 
channel_id](
+                                                           vectorized::Block* 
block, bool* eos) {
             BlockWrapperSPtr next_block;
             _dequeue_data(local_state, next_block, eos, block, id);
             return Status::OK();
@@ -388,20 +482,21 @@ now
     sort(8) --> local merge(1) ---> datasink(1) [2] ---->
 */
 Status LocalMergeSortExchanger::get_block(RuntimeState* state, 
vectorized::Block* block, bool* eos,
-                                          LocalExchangeSourceLocalState& 
local_state) {
-    if (local_state._channel_id != 0) {
+                                          Profile&& profile, SourceInfo&& 
source_info) {
+    if (source_info.channel_id != 0) {
         *eos = true;
         return Status::OK();
     }
     if (!_merger) {
-        RETURN_IF_ERROR(build_merger(state, local_state));
+        DCHECK(source_info.local_state);
+        RETURN_IF_ERROR(build_merger(state, source_info.local_state));
     }
     RETURN_IF_ERROR(_merger->get_next(block, eos));
     return Status::OK();
 }
 
 Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
-                                LocalExchangeSinkLocalState& local_state) {
+                                Profile&& profile, SinkInfo&& sink_info) {
     if (in_block->empty()) {
         return Status::OK();
     }
@@ -411,32 +506,40 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     }
     new_block.swap(*in_block);
     auto wrapper = BlockWrapper::create_shared(std::move(new_block));
-    
local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes(),
-                                                   local_state._channel_id);
+    if (sink_info.local_state) {
+        sink_info.local_state->_shared_state->add_total_mem_usage(
+                wrapper->data_block.allocated_bytes(), *sink_info.channel_id);
+    }
+
     wrapper->ref(_num_partitions);
     for (int i = 0; i < _num_partitions; i++) {
-        _enqueue_data_and_set_ready(i, local_state, {wrapper, {0, 
wrapper->data_block.rows()}});
+        _enqueue_data_and_set_ready(i, sink_info.local_state,
+                                    {wrapper, {0, 
wrapper->data_block.rows()}});
     }
 
     return Status::OK();
 }
 
-void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
+void BroadcastExchanger::close(SourceInfo&& source_info) {
     BroadcastBlock partitioned_block;
     bool eos;
     vectorized::Block block;
-    _data_queue[local_state._channel_id].set_eos();
-    while (_dequeue_data(local_state, partitioned_block, &eos, &block)) {
-        partitioned_block.first->unref(local_state._shared_state, 
local_state._channel_id);
+    _data_queue[source_info.channel_id].set_eos();
+    while (_dequeue_data(source_info.local_state, partitioned_block, &eos, 
&block,
+                         source_info.channel_id)) {
+        partitioned_block.first->unref(
+                source_info.local_state ? 
source_info.local_state->_shared_state : nullptr,
+                source_info.channel_id);
     }
 }
 
 Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
-                                     LocalExchangeSourceLocalState& 
local_state) {
+                                     Profile&& profile, SourceInfo&& 
source_info) {
     BroadcastBlock partitioned_block;
 
-    if (_dequeue_data(local_state, partitioned_block, eos, block)) {
-        SCOPED_TIMER(local_state._copy_data_timer);
+    if (_dequeue_data(source_info.local_state, partitioned_block, eos, block,
+                      source_info.channel_id)) {
+        SCOPED_TIMER(profile.copy_data_timer);
         vectorized::MutableBlock mutable_block =
                 vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
                         block, partitioned_block.first->data_block);
@@ -444,7 +547,9 @@ Status BroadcastExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block,
                                                
partitioned_block.second.offset_start,
                                                
partitioned_block.second.length));
-        block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
+        block_wrapper->unref(
+                source_info.local_state ? 
source_info.local_state->_shared_state : nullptr,
+                source_info.channel_id);
     }
 
     return Status::OK();
@@ -452,21 +557,21 @@ Status BroadcastExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
 
 Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
                                                        vectorized::Block* 
in_block,
-                                                       
LocalExchangeSinkLocalState& local_state) {
+                                                       SinkInfo&& sink_info) {
     vectorized::Block new_block;
     if (!_free_blocks.try_dequeue(new_block)) {
         new_block = {in_block->clone_empty()};
     }
     new_block.swap(*in_block);
-    auto channel_id = (local_state._channel_id++) % _num_partitions;
-    _enqueue_data_and_set_ready(channel_id, local_state,
+    auto channel_id = ((*sink_info.channel_id)++) % _num_partitions;
+    _enqueue_data_and_set_ready(channel_id, sink_info.local_state,
                                 
BlockWrapper::create_shared(std::move(new_block)));
 
     return Status::OK();
 }
 
 Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, 
vectorized::Block* block,
-                                                   
LocalExchangeSinkLocalState& local_state) {
+                                                   SinkInfo&& sink_info) {
     std::vector<uint32_t> channel_ids;
     const auto num_rows = block->rows();
     channel_ids.resize(num_rows, 0);
@@ -481,40 +586,39 @@ Status 
AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectoriz
             std::iota(channel_ids.begin() + i, channel_ids.end(), 0);
         }
     }
-    return _split_rows(state, channel_ids.data(), block, local_state);
+    return _split_rows(state, channel_ids.data(), block, std::move(sink_info));
 }
 
 Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
                                                  const uint32_t* __restrict 
channel_ids,
-                                                 vectorized::Block* block,
-                                                 LocalExchangeSinkLocalState& 
local_state) {
+                                                 vectorized::Block* block, 
SinkInfo&& sink_info) {
     const auto rows = cast_set<int32_t>(block->rows());
     auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
+    auto& partition_rows_histogram = 
_partition_rows_histogram[*sink_info.channel_id];
     {
-        local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
+        partition_rows_histogram.assign(_num_partitions + 1, 0);
         for (int32_t i = 0; i < rows; ++i) {
-            local_state._partition_rows_histogram[channel_ids[i]]++;
+            partition_rows_histogram[channel_ids[i]]++;
         }
         for (int32_t i = 1; i <= _num_partitions; ++i) {
-            local_state._partition_rows_histogram[i] +=
-                    local_state._partition_rows_histogram[i - 1];
+            partition_rows_histogram[i] += partition_rows_histogram[i - 1];
         }
 
         for (int32_t i = rows - 1; i >= 0; --i) {
-            (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 
1] = i;
-            local_state._partition_rows_histogram[channel_ids[i]]--;
+            (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i;
+            partition_rows_histogram[channel_ids[i]]--;
         }
     }
     for (int32_t i = 0; i < _num_partitions; i++) {
-        const size_t start = local_state._partition_rows_histogram[i];
-        const size_t size = local_state._partition_rows_histogram[i + 1] - 
start;
+        const size_t start = partition_rows_histogram[i];
+        const size_t size = partition_rows_histogram[i + 1] - start;
         if (size > 0) {
             std::unique_ptr<vectorized::MutableBlock> mutable_block =
                     
vectorized::MutableBlock::create_unique(block->clone_empty());
             RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
             auto new_block = mutable_block->to_block();
 
-            _enqueue_data_and_set_ready(i, local_state,
+            _enqueue_data_and_set_ready(i, sink_info.local_state,
                                         
BlockWrapper::create_shared(std::move(new_block)));
         }
     }
@@ -522,34 +626,35 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
 }
 
 Status AdaptivePassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
-                                          bool eos, 
LocalExchangeSinkLocalState& local_state) {
+                                          bool eos, Profile&& profile, 
SinkInfo&& sink_info) {
     if (in_block->empty()) {
         return Status::OK();
     }
     if (_is_pass_through) {
-        return _passthrough_sink(state, in_block, local_state);
+        return _passthrough_sink(state, in_block, std::move(sink_info));
     } else {
         if (_total_block++ > _num_partitions) {
             _is_pass_through = true;
         }
-        return _shuffle_sink(state, in_block, local_state);
+        return _shuffle_sink(state, in_block, std::move(sink_info));
     }
 }
 
 Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, 
vectorized::Block* block,
-                                               bool* eos,
-                                               LocalExchangeSourceLocalState& 
local_state) {
+                                               bool* eos, Profile&& profile,
+                                               SourceInfo&& source_info) {
     BlockWrapperSPtr next_block;
-    _dequeue_data(local_state, next_block, eos, block);
+    _dequeue_data(source_info.local_state, next_block, eos, block, 
source_info.channel_id);
     return Status::OK();
 }
 
-void AdaptivePassthroughExchanger::close(LocalExchangeSourceLocalState& 
local_state) {
+void AdaptivePassthroughExchanger::close(SourceInfo&& source_info) {
     vectorized::Block next_block;
     bool eos;
     BlockWrapperSPtr wrapper;
-    _data_queue[local_state._channel_id].set_eos();
-    while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
+    _data_queue[source_info.channel_id].set_eos();
+    while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block,
+                         source_info.channel_id)) {
         // do nothing
     }
 }
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 4d699baa52f..d6871b2ba97 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -20,14 +20,33 @@
 #include "pipeline/dependency.h"
 #include "pipeline/exec/operator.h"
 
-namespace doris::pipeline {
+namespace doris {
 #include "common/compile_check_begin.h"
-
+namespace vectorized {
+class PartitionerBase;
+}
+namespace pipeline {
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
 struct BlockWrapper;
 class SortSourceOperatorX;
 
+struct Profile {
+    RuntimeProfile::Counter* compute_hash_value_timer = nullptr;
+    RuntimeProfile::Counter* distribute_timer = nullptr;
+    RuntimeProfile::Counter* copy_data_timer = nullptr;
+};
+
+struct SinkInfo {
+    int* channel_id;
+    vectorized::PartitionerBase* partitioner;
+    LocalExchangeSinkLocalState* local_state;
+};
+
+struct SourceInfo {
+    int channel_id;
+    LocalExchangeSourceLocalState* local_state;
+};
 /**
  * One exchanger is hold by one `LocalExchangeSharedState`. And one 
`LocalExchangeSharedState` is
  * shared by all local exchange sink operators and source operators with the 
same id.
@@ -60,15 +79,15 @@ public:
               _free_block_limit(free_block_limit) {}
     virtual ~ExchangerBase() = default;
     virtual Status get_block(RuntimeState* state, vectorized::Block* block, 
bool* eos,
-                             LocalExchangeSourceLocalState& local_state) = 0;
+                             Profile&& profile, SourceInfo&& source_info) = 0;
     virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool 
eos,
-                        LocalExchangeSinkLocalState& local_state) = 0;
+                        Profile&& profile, SinkInfo&& sink_info) = 0;
     virtual ExchangeType get_type() const = 0;
     // Called if a local exchanger source operator are closed. Free the unused 
data block in data_queue.
-    virtual void close(LocalExchangeSourceLocalState& local_state) = 0;
+    virtual void close(SourceInfo&& source_info) = 0;
     // Called if all local exchanger source operators are closed. We free the 
memory in
     // `_free_blocks` here.
-    virtual void finalize(LocalExchangeSourceLocalState& local_state);
+    virtual void finalize();
 
     virtual std::string data_queue_debug_string(int i) = 0;
 
@@ -155,12 +174,13 @@ public:
 
 protected:
     // Enqueue data block and set downstream source operator to read.
-    void _enqueue_data_and_set_ready(int channel_id, 
LocalExchangeSinkLocalState& local_state,
+    void _enqueue_data_and_set_ready(int channel_id, 
LocalExchangeSinkLocalState* local_state,
                                      BlockType&& block);
-    bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& 
block, bool* eos,
-                       vectorized::Block* data_block);
-    bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& 
block, bool* eos,
+    bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& 
block, bool* eos,
                        vectorized::Block* data_block, int channel_id);
+
+    void _enqueue_data_and_set_ready(int channel_id, BlockType&& block);
+    bool _dequeue_data(BlockType& block, bool* eos, vectorized::Block* 
data_block, int channel_id);
     std::vector<BlockQueue<BlockType>> _data_queue;
 
 private:
@@ -186,7 +206,7 @@ struct BlockWrapper {
     ~BlockWrapper() { DCHECK_EQ(ref_count.load(), 0); }
     void ref(int delta) { ref_count += delta; }
     void unref(LocalExchangeSharedState* shared_state, size_t allocated_bytes, 
int channel_id) {
-        if (ref_count.fetch_sub(1) == 1) {
+        if (ref_count.fetch_sub(1) == 1 && shared_state != nullptr) {
             DCHECK_GT(allocated_bytes, 0);
             shared_state->sub_total_mem_usage(allocated_bytes, channel_id);
             if (shared_state->exchanger->_free_block_limit == 0 ||
@@ -201,7 +221,7 @@ struct BlockWrapper {
         }
     }
 
-    void unref(LocalExchangeSharedState* shared_state, int channel_id) {
+    void unref(LocalExchangeSharedState* shared_state = nullptr, int 
channel_id = 0) {
         unref(shared_state, data_block.allocated_bytes(), channel_id);
     }
     int ref_value() const { return ref_count.load(); }
@@ -219,19 +239,24 @@ public:
         DCHECK_GT(num_partitions, 0);
         DCHECK_GT(num_sources, 0);
         _data_queue.resize(num_sources);
+        _partition_rows_histogram.resize(running_sink_operators);
     }
     ~ShuffleExchanger() override = default;
-    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
-                LocalExchangeSinkLocalState& local_state) override;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
+                SinkInfo&& sink_info) override;
 
-    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
-                     LocalExchangeSourceLocalState& local_state) override;
-    void close(LocalExchangeSourceLocalState& local_state) override;
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
+                     SourceInfo&& source_info) override;
+    void close(SourceInfo&& source_info) override;
     ExchangeType get_type() const override { return 
ExchangeType::HASH_SHUFFLE; }
 
 protected:
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
-                       vectorized::Block* block, LocalExchangeSinkLocalState& 
local_state);
+                       vectorized::Block* block, int channel_id,
+                       LocalExchangeSinkLocalState* local_state);
+    Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
+                       vectorized::Block* block, int channel_id);
+    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
 };
 
 class BucketShuffleExchanger final : public ShuffleExchanger {
@@ -255,13 +280,13 @@ public:
         _data_queue.resize(num_partitions);
     }
     ~PassthroughExchanger() override = default;
-    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
-                LocalExchangeSinkLocalState& local_state) override;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
+                SinkInfo&& sink_info) override;
 
-    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
-                     LocalExchangeSourceLocalState& local_state) override;
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
+                     SourceInfo&& source_info) override;
     ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; 
}
-    void close(LocalExchangeSourceLocalState& local_state) override;
+    void close(SourceInfo&& source_info) override;
 };
 
 class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> {
@@ -273,13 +298,13 @@ public:
         _data_queue.resize(num_partitions);
     }
     ~PassToOneExchanger() override = default;
-    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
-                LocalExchangeSinkLocalState& local_state) override;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
+                SinkInfo&& sink_info) override;
 
-    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
-                     LocalExchangeSourceLocalState& local_state) override;
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
+                     SourceInfo&& source_info) override;
     ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; 
}
-    void close(LocalExchangeSourceLocalState& local_state) override;
+    void close(SourceInfo&& source_info) override;
 };
 
 class LocalMergeSortExchanger final : public Exchanger<BlockWrapperSPtr> {
@@ -292,17 +317,17 @@ public:
         _data_queue.resize(num_partitions);
     }
     ~LocalMergeSortExchanger() override = default;
-    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
-                LocalExchangeSinkLocalState& local_state) override;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
+                SinkInfo&& sink_info) override;
 
-    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
-                     LocalExchangeSourceLocalState& local_state) override;
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
+                     SourceInfo&& source_info) override;
     ExchangeType get_type() const override { return 
ExchangeType::LOCAL_MERGE_SORT; }
 
-    Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState& 
local_state);
+    Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState* 
local_state);
 
-    void close(LocalExchangeSourceLocalState& local_state) override {}
-    void finalize(LocalExchangeSourceLocalState& local_state) override;
+    void close(SourceInfo&& source_info) override {}
+    void finalize() override;
 
 private:
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
@@ -318,13 +343,13 @@ public:
         _data_queue.resize(num_partitions);
     }
     ~BroadcastExchanger() override = default;
-    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
-                LocalExchangeSinkLocalState& local_state) override;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
+                SinkInfo&& sink_info) override;
 
-    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
-                     LocalExchangeSourceLocalState& local_state) override;
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
+                     SourceInfo&& source_info) override;
     ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
-    void close(LocalExchangeSourceLocalState& local_state) override;
+    void close(SourceInfo&& source_info) override;
 };
 
 //The code in AdaptivePassthroughExchanger is essentially
@@ -337,26 +362,28 @@ public:
             : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
                                           free_block_limit) {
         _data_queue.resize(num_partitions);
+        _partition_rows_histogram.resize(running_sink_operators);
     }
-    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
-                LocalExchangeSinkLocalState& local_state) override;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
+                SinkInfo&& sink_info) override;
 
-    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
-                     LocalExchangeSourceLocalState& local_state) override;
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, 
Profile&& profile,
+                     SourceInfo&& source_info) override;
     ExchangeType get_type() const override { return 
ExchangeType::ADAPTIVE_PASSTHROUGH; }
 
-    void close(LocalExchangeSourceLocalState& local_state) override;
+    void close(SourceInfo&& source_info) override;
 
 private:
     Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block,
-                             LocalExchangeSinkLocalState& local_state);
-    Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block,
-                         LocalExchangeSinkLocalState& local_state);
+                             SinkInfo&& sink_info);
+    Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, 
SinkInfo&& sink_info);
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
-                       vectorized::Block* block, LocalExchangeSinkLocalState& 
local_state);
+                       vectorized::Block* block, SinkInfo&& sink_info);
 
     std::atomic_bool _is_pass_through = false;
     std::atomic_int32_t _total_block = 0;
+    std::vector<std::vector<uint32_t>> _partition_rows_histogram;
 };
 #include "common/compile_check_end.h"
-} // namespace doris::pipeline
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to