This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d1e8bebb3c6 [refactor](shuffle) (PART I) Unify all hash-partition 
based shuffling (#45256)
d1e8bebb3c6 is described below

commit d1e8bebb3c610bc729b0a088fd693f013aeb5336
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Wed Dec 11 14:20:41 2024 +0800

    [refactor](shuffle) (PART I) Unify all hash-partition based shuffling 
(#45256)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 308 +++++----------------
 be/src/pipeline/exec/exchange_sink_operator.h      |  58 +---
 be/src/pipeline/shuffle/writer.cpp                 | 114 ++++++++
 be/src/pipeline/shuffle/writer.h                   |  53 ++++
 be/src/vec/runtime/partitioner.h                   |  16 +-
 .../sink/scale_writer_partitioning_exchanger.hpp   |  92 ++++--
 be/src/vec/sink/tablet_sink_hash_partitioner.cpp   | 152 ++++++++++
 be/src/vec/sink/tablet_sink_hash_partitioner.h     |  83 ++++++
 be/src/vec/sink/vdata_stream_sender.cpp            |  12 +-
 be/src/vec/sink/vdata_stream_sender.h              |   8 +-
 10 files changed, 562 insertions(+), 334 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 04b9653e9c8..aa893fc0a26 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -38,6 +38,7 @@
 #include "util/uid_util.h"
 #include "vec/columns/column_const.h"
 #include "vec/exprs/vexpr.h"
+#include "vec/sink/tablet_sink_hash_partitioner.h"
 
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
@@ -120,6 +121,57 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         _sink_buffer->set_dependency(state->fragment_instance_id().lo, 
_queue_dependency, this);
     }
 
+    if (_part_type == TPartitionType::HASH_PARTITIONED) {
+        _partition_count = channels.size();
+        _partitioner =
+                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                        channels.size());
+        RETURN_IF_ERROR(_partitioner->init(p._texprs));
+        RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+        _profile->add_info_string("Partitioner",
+                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
+    } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+        _partition_count = channels.size();
+        _partitioner =
+                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                        channels.size());
+        RETURN_IF_ERROR(_partitioner->init(p._texprs));
+        RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+        _profile->add_info_string("Partitioner",
+                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
+    } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        _partition_count = channels.size();
+        _profile->add_info_string("Partitioner",
+                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
+        _partitioner = std::make_unique<vectorized::TabletSinkHashPartitioner>(
+                _partition_count, p._tablet_sink_txn_id, p._tablet_sink_schema,
+                p._tablet_sink_partition, p._tablet_sink_location, 
p._tablet_sink_tuple_id, this);
+        RETURN_IF_ERROR(_partitioner->init({}));
+        RETURN_IF_ERROR(_partitioner->prepare(state, {}));
+    } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+        _partition_count =
+                channels.size() * 
config::table_sink_partition_write_max_partition_nums_per_writer;
+        _partitioner = std::make_unique<vectorized::ScaleWriterPartitioner>(
+                channels.size(), _partition_count, channels.size(), 1,
+                
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
 /
+                                        state->task_num() ==
+                                0
+                        ? 
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
+                        : 
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
 /
+                                  state->task_num(),
+                
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
+                                        state->task_num() ==
+                                0
+                        ? 
config::table_sink_partition_write_min_data_processed_rebalance_threshold
+                        : 
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
+                                  state->task_num());
+
+        RETURN_IF_ERROR(_partitioner->init(p._texprs));
+        RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+        _profile->add_info_string("Partitioner",
+                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
+    }
+
     return Status::OK();
 }
 
@@ -145,6 +197,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(Base::open(state));
+    _writer.reset(new Writer());
     auto& p = _parent->cast<ExchangeSinkOperatorX>();
 
     if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM ||
@@ -190,113 +243,16 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) 
{
             }
         }
     }
-    if (_part_type == TPartitionType::HASH_PARTITIONED) {
-        _partition_count = channels.size();
-        _partitioner =
-                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
-                        channels.size());
-        RETURN_IF_ERROR(_partitioner->init(p._texprs));
-        RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
-        _profile->add_info_string("Partitioner",
-                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
-    } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
-        _partition_count = channels.size();
-        _partitioner =
-                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
-                        channels.size());
-        RETURN_IF_ERROR(_partitioner->init(p._texprs));
-        RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
-        _profile->add_info_string("Partitioner",
-                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
-    } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
-        _partition_count = channels.size();
-        _profile->add_info_string("Partitioner",
-                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
-        _txn_id = p._tablet_sink_txn_id;
-        _schema = std::make_shared<OlapTableSchemaParam>();
-        RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema));
-        _vpartition = std::make_unique<VOlapTablePartitionParam>(_schema, 
p._tablet_sink_partition);
-        RETURN_IF_ERROR(_vpartition->init());
-        auto find_tablet_mode = 
vectorized::OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
-        _tablet_finder =
-                
std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), 
find_tablet_mode);
-        _tablet_sink_tuple_desc = 
_state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id);
-        _tablet_sink_row_desc = p._pool->add(new 
RowDescriptor(_tablet_sink_tuple_desc, false));
-        _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size());
-        for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
-            RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, 
_tablet_sink_expr_ctxs[i]));
-        }
-        // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, 
we handle the processing of auto_increment column
-        // on exchange node rather than on TabletWriter
-        _block_convertor =
-                
std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
-        _block_convertor->init_autoinc_info(_schema->db_id(), 
_schema->table_id(),
-                                            _state->batch_size());
-        _location = p._pool->add(new 
OlapTableLocationParam(p._tablet_sink_location));
-        _row_distribution.init(
-                {.state = _state,
-                 .block_convertor = _block_convertor.get(),
-                 .tablet_finder = _tablet_finder.get(),
-                 .vpartition = _vpartition.get(),
-                 .add_partition_request_timer = _add_partition_request_timer,
-                 .txn_id = _txn_id,
-                 .pool = p._pool.get(),
-                 .location = _location,
-                 .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
-                 .schema = _schema,
-                 .caller = (void*)this,
-                 .create_partition_callback = 
&ExchangeSinkLocalState::empty_callback_function});
-    } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
-        _partition_count =
-                channels.size() * 
config::table_sink_partition_write_max_partition_nums_per_writer;
-        _partitioner =
-                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
-                        _partition_count);
-        _partition_function = 
std::make_unique<HashPartitionFunction>(_partitioner.get());
-
-        scale_writer_partitioning_exchanger = std::make_unique<
-                
vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>(
-                channels.size(), *_partition_function, _partition_count, 
channels.size(), 1,
-                
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
 /
-                                        state->task_num() ==
-                                0
-                        ? 
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
-                        : 
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
 /
-                                  state->task_num(),
-                
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
-                                        state->task_num() ==
-                                0
-                        ? 
config::table_sink_partition_write_min_data_processed_rebalance_threshold
-                        : 
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
-                                  state->task_num());
-
-        RETURN_IF_ERROR(_partitioner->init(p._texprs));
-        RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
-        _profile->add_info_string("Partitioner",
-                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
-    }
 
     if (_part_type == TPartitionType::HASH_PARTITIONED ||
         _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
-        _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+        _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED ||
+        _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
         RETURN_IF_ERROR(_partitioner->open(state));
-    } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
-        RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
     }
     return Status::OK();
 }
 
-Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* 
input_block) {
-    RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
-    auto& p = _parent->cast<ExchangeSinkOperatorX>();
-    // Recovery back
-    _row_distribution.clear_batching_stats();
-    _row_distribution._batching_block->clear_column_data();
-    _row_distribution._deal_batched = false;
-    RETURN_IF_ERROR(p.sink(_state, input_block, false));
-    return Status::OK();
-}
-
 std::string ExchangeSinkLocalState::name_suffix() {
     std::string name = " (id=" + std::to_string(_parent->node_id());
     auto& p = _parent->cast<ExchangeSinkOperatorX>();
@@ -491,105 +447,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         local_state.current_channel_idx =
                 (local_state.current_channel_idx + 1) % 
local_state.channels.size();
     } else if (_part_type == TPartitionType::HASH_PARTITIONED ||
-               _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
-        auto rows = block->rows();
-        {
-            SCOPED_TIMER(local_state._split_block_hash_compute_timer);
-            RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
block));
-        }
-        int64_t old_channel_mem_usage = 0;
-        for (const auto& channel : local_state.channels) {
-            old_channel_mem_usage += channel->mem_usage();
-        }
-        if (_part_type == TPartitionType::HASH_PARTITIONED) {
-            SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
-            RETURN_IF_ERROR(channel_add_rows(
-                    state, local_state.channels, local_state._partition_count,
-                    
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
-        } else {
-            SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
-            RETURN_IF_ERROR(channel_add_rows(
-                    state, local_state.channels, local_state._partition_count,
-                    
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
-        }
-        int64_t new_channel_mem_usage = 0;
-        for (const auto& channel : local_state.channels) {
-            new_channel_mem_usage += channel->mem_usage();
-        }
-        COUNTER_UPDATE(local_state.memory_used_counter(),
-                       new_channel_mem_usage - old_channel_mem_usage);
-    } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
-        int64_t old_channel_mem_usage = 0;
-        for (const auto& channel : local_state.channels) {
-            old_channel_mem_usage += channel->mem_usage();
-        }
-        // check out of limit
-        std::shared_ptr<vectorized::Block> convert_block = 
std::make_shared<vectorized::Block>();
-        const auto& num_channels = local_state._partition_count;
-        std::vector<std::vector<uint32>> channel2rows;
-        channel2rows.resize(num_channels);
-        auto input_rows = block->rows();
-
-        if (input_rows > 0) {
-            bool has_filtered_rows = false;
-            int64_t filtered_rows = 0;
-            local_state._number_input_rows += input_rows;
-
-            
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
-                    *block, convert_block, filtered_rows, has_filtered_rows,
-                    local_state._row_part_tablet_ids, 
local_state._number_input_rows));
-            if (local_state._row_distribution.batching_rows() > 0) {
-                SCOPED_TIMER(local_state._send_new_partition_timer);
-                RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
-            } else {
-                const auto& row_ids = 
local_state._row_part_tablet_ids[0].row_ids;
-                const auto& tablet_ids = 
local_state._row_part_tablet_ids[0].tablet_ids;
-                for (int idx = 0; idx < row_ids.size(); ++idx) {
-                    const auto& row = row_ids[idx];
-                    const auto& tablet_id_hash =
-                            HashUtil::zlib_crc_hash(&tablet_ids[idx], 
sizeof(int64), 0);
-                    channel2rows[tablet_id_hash % 
num_channels].emplace_back(row);
-                }
-            }
-        }
-
-        {
-            SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
-            // the convert_block maybe different with block after execute exprs
-            // when send data we still use block
-            RETURN_IF_ERROR(channel_add_rows_with_idx(state, 
local_state.channels, num_channels,
-                                                      channel2rows, block, 
eos));
-        }
-        int64_t new_channel_mem_usage = 0;
-        for (const auto& channel : local_state.channels) {
-            new_channel_mem_usage += channel->mem_usage();
-        }
-        COUNTER_UPDATE(local_state.memory_used_counter(),
-                       new_channel_mem_usage - old_channel_mem_usage);
-    } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
-        int64_t old_channel_mem_usage = 0;
-        for (const auto& channel : local_state.channels) {
-            old_channel_mem_usage += channel->mem_usage();
-        }
-        {
-            SCOPED_TIMER(local_state._split_block_hash_compute_timer);
-            RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
block));
-        }
-        std::vector<std::vector<uint32>> assignments =
-                local_state.scale_writer_partitioning_exchanger->accept(block);
-        {
-            SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
-            RETURN_IF_ERROR(channel_add_rows_with_idx(state, 
local_state.channels,
-                                                      
local_state.channels.size(), assignments,
-                                                      block, eos));
-        }
-
-        int64_t new_channel_mem_usage = 0;
-        for (const auto& channel : local_state.channels) {
-            new_channel_mem_usage += channel->mem_usage();
-        }
-        COUNTER_UPDATE(local_state.memory_used_counter(),
-                       new_channel_mem_usage - old_channel_mem_usage);
+               _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
+               _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED ||
+               _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+        RETURN_IF_ERROR(local_state._writer->write(&local_state, state, block, 
eos));
     } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
         // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
         // 1. select channel
@@ -641,44 +502,6 @@ void 
ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buf
     }
 }
 
-Status ExchangeSinkOperatorX::channel_add_rows(
-        RuntimeState* state, 
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
-        size_t num_channels, const uint32_t* __restrict channel_ids, size_t 
rows,
-        vectorized::Block* block, bool eos) {
-    std::vector<std::vector<uint32_t>> channel2rows;
-    channel2rows.resize(num_channels);
-    for (uint32_t i = 0; i < rows; i++) {
-        channel2rows[channel_ids[i]].emplace_back(i);
-    }
-
-    RETURN_IF_ERROR(
-            channel_add_rows_with_idx(state, channels, num_channels, 
channel2rows, block, eos));
-    return Status::OK();
-}
-
-Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
-        RuntimeState* state, 
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
-        size_t num_channels, std::vector<std::vector<uint32_t>>& channel2rows,
-        vectorized::Block* block, bool eos) {
-    Status status = Status::OK();
-    for (int i = 0; i < num_channels; ++i) {
-        if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) {
-            status = channels[i]->add_rows(block, channel2rows[i], false);
-            HANDLE_CHANNEL_STATUS(state, channels[i], status);
-            channel2rows[i].clear();
-        }
-    }
-    if (eos) {
-        for (int i = 0; i < num_channels; ++i) {
-            if (!channels[i]->is_receiver_eof()) {
-                status = channels[i]->add_rows(block, channel2rows[i], true);
-                HANDLE_CHANNEL_STATUS(state, channels[i], status);
-            }
-        }
-    }
-    return Status::OK();
-}
-
 std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
@@ -697,17 +520,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     if (_closed) {
         return Status::OK();
     }
-    if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED &&
-        _block_convertor != nullptr && _tablet_finder != nullptr) {
-        
_state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
-                                              
_tablet_finder->num_filtered_rows());
-        _state->update_num_rows_load_unselected(
-                _tablet_finder->num_immutable_partition_filtered_rows());
-        // sink won't see those filtered rows, we should compensate here
-        _state->set_num_rows_load_total(_state->num_rows_load_filtered() +
-                                        _state->num_rows_load_unselected());
-    }
     SCOPED_TIMER(exec_time_counter());
+    if (_partitioner) {
+        RETURN_IF_ERROR(_partitioner->close(state));
+    }
     SCOPED_TIMER(_close_timer);
     if (_queue_dependency) {
         COUNTER_UPDATE(_wait_queue_timer, 
_queue_dependency->watcher_elapse_time());
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 8d094b43f61..e88389b1d7b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -26,6 +26,7 @@
 #include "common/status.h"
 #include "exchange_sink_buffer.h"
 #include "operator.h"
+#include "pipeline/shuffle/writer.h"
 #include "vec/sink/scale_writer_partitioning_exchanger.hpp"
 #include "vec/sink/vdata_stream_sender.h"
 
@@ -39,20 +40,6 @@ class ExchangeSinkLocalState final : public 
PipelineXSinkLocalState<> {
     ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
     using Base = PipelineXSinkLocalState<>;
 
-private:
-    class HashPartitionFunction {
-    public:
-        HashPartitionFunction(vectorized::PartitionerBase* partitioner)
-                : _partitioner(partitioner) {}
-
-        int get_partition(vectorized::Block* block, int position) {
-            return _partitioner->get_channel_ids().get<uint32_t>()[position];
-        }
-
-    private:
-        vectorized::PartitionerBase* _partitioner;
-    };
-
 public:
     ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state), _serializer(this) {
@@ -106,19 +93,20 @@ public:
     std::string name_suffix() override;
     segment_v2::CompressionTypePB compression_type() const;
     std::string debug_string(int indentation_level) const override;
-    static Status empty_callback_function(void* sender, 
TCreatePartitionResult* result) {
-        return Status::OK();
+    RuntimeProfile::Counter* send_new_partition_timer() { return 
_send_new_partition_timer; }
+    RuntimeProfile::Counter* add_partition_request_timer() { return 
_add_partition_request_timer; }
+    RuntimeProfile::Counter* split_block_hash_compute_timer() {
+        return _split_block_hash_compute_timer;
+    }
+    RuntimeProfile::Counter* distribute_rows_into_channels_timer() {
+        return _distribute_rows_into_channels_timer;
     }
-    Status _send_new_partition_batch(vectorized::Block* input_block);
     std::vector<std::shared_ptr<vectorized::Channel>> channels;
     int current_channel_idx {0}; // index of current channel to send to if 
_random == true
     bool only_local_exchange {false};
 
     void on_channel_finished(InstanceLoId channel_id);
-
-    // for external table sink hash partition
-    
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
-            scale_writer_partitioning_exchanger;
+    vectorized::PartitionerBase* partitioner() const { return 
_partitioner.get(); }
 
 private:
     friend class ExchangeSinkOperatorX;
@@ -176,28 +164,16 @@ private:
      */
     std::vector<std::shared_ptr<Dependency>> _local_channels_dependency;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
+    std::unique_ptr<Writer> _writer;
     size_t _partition_count;
 
     std::shared_ptr<Dependency> _finish_dependency;
 
     // for shuffle data by partition and tablet
-    int64_t _txn_id = -1;
-    vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
-    std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
-    std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr;
-    std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
-    std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor = 
nullptr;
-    TupleDescriptor* _tablet_sink_tuple_desc = nullptr;
-    RowDescriptor* _tablet_sink_row_desc = nullptr;
-    OlapTableLocationParam* _location = nullptr;
-    vectorized::VRowDistribution _row_distribution;
+
     RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
-    std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids;
-    int64_t _number_input_rows = 0;
     TPartitionType::type _part_type;
 
-    // for external table sink hash partition
-    std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
     std::atomic<bool> _reach_limit = false;
     int _last_local_channel_idx = -1;
 
@@ -230,6 +206,7 @@ public:
     // In a merge sort scenario, there are only n RPCs, so a shared sink 
buffer is not needed.
     /// TODO: Modify this to let FE handle the judgment instead of BE.
     std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId 
sender_ins_id);
+    vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return 
_tablet_sink_expr_ctxs; }
 
 private:
     friend class ExchangeSinkLocalState;
@@ -237,17 +214,6 @@ private:
     template <typename ChannelPtrType>
     void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, 
Status st);
 
-    Status channel_add_rows(RuntimeState* state,
-                            std::vector<std::shared_ptr<vectorized::Channel>>& 
channels,
-                            size_t num_channels, const uint32_t* __restrict 
channel_ids,
-                            size_t rows, vectorized::Block* block, bool eos);
-
-    Status channel_add_rows_with_idx(RuntimeState* state,
-                                     
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
-                                     size_t num_channels,
-                                     std::vector<std::vector<uint32_t>>& 
channel2rows,
-                                     vectorized::Block* block, bool eos);
-
     // Use ExchangeSinkOperatorX to create a sink buffer.
     // The sink buffer can be shared among multiple ExchangeSinkLocalState 
instances,
     // or each ExchangeSinkLocalState can have its own sink buffer.
diff --git a/be/src/pipeline/shuffle/writer.cpp 
b/be/src/pipeline/shuffle/writer.cpp
new file mode 100644
index 00000000000..c27fd9a7aeb
--- /dev/null
+++ b/be/src/pipeline/shuffle/writer.cpp
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "writer.h"
+
+#include "pipeline/exec/exchange_sink_operator.h"
+#include "vec/core/block.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+template <typename ChannelPtrType>
+void Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, 
Status st) const {
+    channel->set_receiver_eof(st);
+    // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
+    static_cast<void>(channel->close(state));
+}
+
+Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state,
+                     vectorized::Block* block, bool eos) const {
+    auto rows = block->rows();
+    {
+        SCOPED_TIMER(local_state->split_block_hash_compute_timer());
+        RETURN_IF_ERROR(local_state->partitioner()->do_partitioning(state, 
block));
+    }
+    int64_t old_channel_mem_usage = 0;
+    for (const auto& channel : local_state->channels) {
+        old_channel_mem_usage += channel->mem_usage();
+    }
+    {
+        SCOPED_TIMER(local_state->distribute_rows_into_channels_timer());
+        const auto& channel_filed = 
local_state->partitioner()->get_channel_ids();
+        if (channel_filed.len == sizeof(uint32_t)) {
+            RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels,
+                                              local_state->channels.size(),
+                                              channel_filed.get<uint32_t>(), 
rows, block, eos));
+        } else {
+            RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels,
+                                              local_state->channels.size(),
+                                              channel_filed.get<int64_t>(), 
rows, block, eos));
+        }
+    }
+    int64_t new_channel_mem_usage = 0;
+    for (const auto& channel : local_state->channels) {
+        new_channel_mem_usage += channel->mem_usage();
+    }
+    COUNTER_UPDATE(local_state->memory_used_counter(),
+                   new_channel_mem_usage - old_channel_mem_usage);
+    return Status::OK();
+}
+
+template <typename ChannelIdType>
+Status Writer::_channel_add_rows(RuntimeState* state,
+                                 
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
+                                 size_t partition_count,
+                                 const ChannelIdType* __restrict channel_ids, 
size_t rows,
+                                 vectorized::Block* block, bool eos) const {
+    std::vector<uint32_t> partition_rows_histogram;
+    auto row_idx = vectorized::PODArray<uint32_t>(rows);
+    {
+        partition_rows_histogram.assign(partition_count + 2, 0);
+        for (size_t i = 0; i < rows; ++i) {
+            partition_rows_histogram[channel_ids[i] + 1]++;
+        }
+        for (size_t i = 1; i <= partition_count + 1; ++i) {
+            partition_rows_histogram[i] += partition_rows_histogram[i - 1];
+        }
+        for (int32_t i = cast_set<int32_t>(rows) - 1; i >= 0; --i) {
+            row_idx[partition_rows_histogram[channel_ids[i] + 1] - 1] = i;
+            partition_rows_histogram[channel_ids[i] + 1]--;
+        }
+    }
+#define HANDLE_CHANNEL_STATUS(state, channel, status)    \
+    do {                                                 \
+        if (status.is<ErrorCode::END_OF_FILE>()) {       \
+            _handle_eof_channel(state, channel, status); \
+        } else {                                         \
+            RETURN_IF_ERROR(status);                     \
+        }                                                \
+    } while (0)
+    Status status = Status::OK();
+    for (size_t i = 0; i < partition_count; ++i) {
+        uint32_t start = partition_rows_histogram[i + 1];
+        uint32_t size = partition_rows_histogram[i + 2] - start;
+        if (!channels[i]->is_receiver_eof() && size > 0) {
+            status = channels[i]->add_rows(block, row_idx.data(), start, size, 
false);
+            HANDLE_CHANNEL_STATUS(state, channels[i], status);
+        }
+    }
+    if (eos) {
+        for (int i = 0; i < partition_count; ++i) {
+            if (!channels[i]->is_receiver_eof()) {
+                status = channels[i]->add_rows(block, row_idx.data(), 0, 0, 
true);
+                HANDLE_CHANNEL_STATUS(state, channels[i], status);
+            }
+        }
+    }
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/shuffle/writer.h b/be/src/pipeline/shuffle/writer.h
new file mode 100644
index 00000000000..0eb77212029
--- /dev/null
+++ b/be/src/pipeline/shuffle/writer.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/sink/vdata_stream_sender.h"
+
+namespace doris {
+class RuntimeState;
+class Status;
+namespace vectorized {
+class Block;
+class Channel;
+} // namespace vectorized
+namespace pipeline {
+
+#include "common/compile_check_begin.h"
+class ExchangeSinkLocalState;
+
+class Writer {
+public:
+    Writer() = default;
+
+    Status write(ExchangeSinkLocalState* local_state, RuntimeState* state, 
vectorized::Block* block,
+                 bool eos) const;
+
+private:
+    template <typename ChannelIdType>
+    Status _channel_add_rows(RuntimeState* state,
+                             
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
+                             size_t partition_count, const ChannelIdType* 
__restrict channel_ids,
+                             size_t rows, vectorized::Block* block, bool eos) 
const;
+
+    template <typename ChannelPtrType>
+    void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, 
Status st) const;
+};
+#include "common/compile_check_end.h"
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index d5492c3be87..53d8b84d09c 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -21,11 +21,8 @@
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 
-namespace doris {
+namespace doris::vectorized {
 #include "common/compile_check_begin.h"
-
-namespace vectorized {
-
 struct ChannelField {
     const void* channel_id;
     const uint32_t len;
@@ -48,12 +45,16 @@ public:
 
     virtual Status open(RuntimeState* state) = 0;
 
+    virtual Status close(RuntimeState* state) = 0;
+
     virtual Status do_partitioning(RuntimeState* state, Block* block) const = 
0;
 
     virtual ChannelField get_channel_ids() const = 0;
 
     virtual Status clone(RuntimeState* state, 
std::unique_ptr<PartitionerBase>& partitioner) = 0;
 
+    size_t partition_count() const { return _partition_count; }
+
 protected:
     const size_t _partition_count;
 };
@@ -74,6 +75,8 @@ public:
 
     Status open(RuntimeState* state) override { return 
VExpr::open(_partition_expr_ctxs, state); }
 
+    Status close(RuntimeState* state) override { return Status::OK(); }
+
     Status do_partitioning(RuntimeState* state, Block* block) const override;
 
     ChannelField get_channel_ids() const override { return {_hash_vals.data(), 
sizeof(uint32_t)}; }
@@ -108,8 +111,5 @@ struct SpillPartitionChannelIds {
         return ((l >> 16) | (l << 16)) % r;
     }
 };
-
-} // namespace vectorized
-} // namespace doris
-
 #include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp 
b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
index f7435249c20..92e52af4c67 100644
--- a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
+++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
@@ -24,28 +24,51 @@
 
 #include "vec/core/block.h"
 #include "vec/exec/skewed_partition_rebalancer.h"
+#include "vec/runtime/partitioner.h"
 
 namespace doris::vectorized {
 
-template <typename PartitionFunction>
-class ScaleWriterPartitioningExchanger {
+class ScaleWriterPartitioner final : public PartitionerBase {
 public:
-    ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction& 
partition_function,
-                                     int partition_count, int task_count, int 
task_bucket_count,
-                                     long 
min_partition_data_processed_rebalance_threshold,
-                                     long 
min_data_processed_rebalance_threshold)
-            : _channel_size(channel_size),
-              _partition_function(partition_function),
+    using HashValType = uint32_t;
+    ScaleWriterPartitioner(int channel_size, int partition_count, int 
task_count,
+                           int task_bucket_count,
+                           long 
min_partition_data_processed_rebalance_threshold,
+                           long min_data_processed_rebalance_threshold)
+            : PartitionerBase(partition_count),
+              _channel_size(channel_size),
               _partition_rebalancer(partition_count, task_count, 
task_bucket_count,
                                     
min_partition_data_processed_rebalance_threshold,
                                     min_data_processed_rebalance_threshold),
               _partition_row_counts(partition_count, 0),
               _partition_writer_ids(partition_count, -1),
-              _partition_writer_indexes(partition_count, 0) {}
+              _partition_writer_indexes(partition_count, 0),
+              _task_count(task_count),
+              _task_bucket_count(task_bucket_count),
+              _min_partition_data_processed_rebalance_threshold(
+                      min_partition_data_processed_rebalance_threshold),
+              
_min_data_processed_rebalance_threshold(min_data_processed_rebalance_threshold) 
{
+        _crc_partitioner =
+                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                        _partition_count);
+    }
+
+    ~ScaleWriterPartitioner() override = default;
+
+    Status init(const std::vector<TExpr>& texprs) override {
+        return _crc_partitioner->init(texprs);
+    }
+
+    Status prepare(RuntimeState* state, const RowDescriptor& row_desc) 
override {
+        return _crc_partitioner->prepare(state, row_desc);
+    }
+
+    Status open(RuntimeState* state) override { return 
_crc_partitioner->open(state); }
+
+    Status close(RuntimeState* state) override { return 
_crc_partitioner->close(state); }
 
-    std::vector<std::vector<uint32_t>> accept(Block* block) {
-        std::vector<std::vector<uint32_t>> writerAssignments(_channel_size,
-                                                             
std::vector<uint32_t>());
+    Status do_partitioning(RuntimeState* state, Block* block) const override {
+        _hash_vals.resize(block->rows());
         for (int partition_id = 0; partition_id < 
_partition_row_counts.size(); partition_id++) {
             _partition_row_counts[partition_id] = 0;
             _partition_writer_ids[partition_id] = -1;
@@ -53,40 +76,59 @@ public:
 
         _partition_rebalancer.rebalance();
 
-        for (int position = 0; position < block->rows(); position++) {
-            int partition_id = _partition_function.get_partition(block, 
position);
+        RETURN_IF_ERROR(_crc_partitioner->do_partitioning(state, block));
+        const auto* crc_values = 
_crc_partitioner->get_channel_ids().get<uint32_t>();
+        for (size_t position = 0; position < block->rows(); position++) {
+            int partition_id = crc_values[position];
             _partition_row_counts[partition_id] += 1;
 
             // Get writer id for this partition by looking at the scaling state
             int writer_id = _partition_writer_ids[partition_id];
             if (writer_id == -1) {
-                writer_id = get_next_writer_id(partition_id);
+                writer_id = _get_next_writer_id(partition_id);
                 _partition_writer_ids[partition_id] = writer_id;
             }
-            writerAssignments[writer_id].push_back(position);
+            _hash_vals[position] = writer_id;
         }
 
-        for (int partition_id = 0; partition_id < 
_partition_row_counts.size(); partition_id++) {
+        for (size_t partition_id = 0; partition_id < 
_partition_row_counts.size(); partition_id++) {
             _partition_rebalancer.add_partition_row_count(partition_id,
                                                           
_partition_row_counts[partition_id]);
         }
         _partition_rebalancer.add_data_processed(block->bytes());
 
-        return writerAssignments;
+        return Status::OK();
+    }
+
+    ChannelField get_channel_ids() const override {
+        return {_hash_vals.data(), sizeof(HashValType)};
     }
 
-    int get_next_writer_id(int partition_id) {
+    Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& 
partitioner) override {
+        partitioner.reset(new ScaleWriterPartitioner(
+                _channel_size, _partition_count, _task_count, 
_task_bucket_count,
+                _min_partition_data_processed_rebalance_threshold,
+                _min_data_processed_rebalance_threshold));
+        return Status::OK();
+    }
+
+private:
+    int _get_next_writer_id(int partition_id) const {
         return _partition_rebalancer.get_task_id(partition_id,
                                                  
_partition_writer_indexes[partition_id]++);
     }
 
-private:
     int _channel_size;
-    PartitionFunction& _partition_function;
-    SkewedPartitionRebalancer _partition_rebalancer;
-    std::vector<int> _partition_row_counts;
-    std::vector<int> _partition_writer_ids;
-    std::vector<int> _partition_writer_indexes;
+    std::unique_ptr<PartitionerBase> _crc_partitioner;
+    mutable SkewedPartitionRebalancer _partition_rebalancer;
+    mutable std::vector<int> _partition_row_counts;
+    mutable std::vector<int> _partition_writer_ids;
+    mutable std::vector<int> _partition_writer_indexes;
+    mutable std::vector<HashValType> _hash_vals;
+    const int _task_count;
+    const int _task_bucket_count;
+    const long _min_partition_data_processed_rebalance_threshold;
+    const long _min_data_processed_rebalance_threshold;
 };
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/tablet_sink_hash_partitioner.cpp 
b/be/src/vec/sink/tablet_sink_hash_partitioner.cpp
new file mode 100644
index 00000000000..7e854d19add
--- /dev/null
+++ b/be/src/vec/sink/tablet_sink_hash_partitioner.cpp
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/tablet_sink_hash_partitioner.h"
+
+#include "pipeline/exec/operator.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+TabletSinkHashPartitioner::TabletSinkHashPartitioner(
+        size_t partition_count, int64_t txn_id, const TOlapTableSchemaParam& 
tablet_sink_schema,
+        const TOlapTablePartitionParam& tablet_sink_partition,
+        const TOlapTableLocationParam& tablet_sink_location, const TTupleId& 
tablet_sink_tuple_id,
+        pipeline::ExchangeSinkLocalState* local_state)
+        : PartitionerBase(partition_count),
+          _txn_id(txn_id),
+          _tablet_sink_schema(tablet_sink_schema),
+          _tablet_sink_partition(tablet_sink_partition),
+          _tablet_sink_location(tablet_sink_location),
+          _tablet_sink_tuple_id(tablet_sink_tuple_id),
+          _local_state(local_state) {}
+
+Status TabletSinkHashPartitioner::init(const std::vector<TExpr>& texprs) {
+    return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::prepare(RuntimeState* state, const 
RowDescriptor& row_desc) {
+    return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::open(RuntimeState* state) {
+    _schema = std::make_shared<OlapTableSchemaParam>();
+    RETURN_IF_ERROR(_schema->init(_tablet_sink_schema));
+    _vpartition = std::make_unique<VOlapTablePartitionParam>(_schema, 
_tablet_sink_partition);
+    RETURN_IF_ERROR(_vpartition->init());
+    auto find_tablet_mode = 
vectorized::OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
+    _tablet_finder =
+            std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), 
find_tablet_mode);
+    _tablet_sink_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_tablet_sink_tuple_id);
+    _tablet_sink_row_desc =
+            state->obj_pool()->add(new RowDescriptor(_tablet_sink_tuple_desc, 
false));
+    auto& ctxs =
+            
_local_state->parent()->cast<pipeline::ExchangeSinkOperatorX>().tablet_sink_expr_ctxs();
+    _tablet_sink_expr_ctxs.resize(ctxs.size());
+    for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
+    }
+    // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we 
handle the processing of auto_increment column
+    // on exchange node rather than on TabletWriter
+    _block_convertor =
+            
std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
+    _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), 
state->batch_size());
+    _location = state->obj_pool()->add(new 
OlapTableLocationParam(_tablet_sink_location));
+    _row_distribution.init(
+            {.state = state,
+             .block_convertor = _block_convertor.get(),
+             .tablet_finder = _tablet_finder.get(),
+             .vpartition = _vpartition.get(),
+             .add_partition_request_timer = 
_local_state->add_partition_request_timer(),
+             .txn_id = _txn_id,
+             .pool = state->obj_pool(),
+             .location = _location,
+             .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
+             .schema = _schema,
+             .caller = (void*)this,
+             .create_partition_callback = 
&TabletSinkHashPartitioner::empty_callback_function});
+    RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
+    return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::do_partitioning(RuntimeState* state, Block* 
block) const {
+    _hash_vals.resize(block->rows());
+    if (block->empty()) {
+        return Status::OK();
+    }
+    std::fill(_hash_vals.begin(), _hash_vals.end(), -1);
+    bool has_filtered_rows = false;
+    int64_t filtered_rows = 0;
+    int64_t number_input_rows = _local_state->rows_input_counter()->value();
+    std::shared_ptr<vectorized::Block> convert_block = 
std::make_shared<vectorized::Block>();
+    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
+            *block, convert_block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids,
+            number_input_rows));
+    if (_row_distribution.batching_rows() > 0) {
+        SCOPED_TIMER(_local_state->send_new_partition_timer());
+        RETURN_IF_ERROR(_send_new_partition_batch(state, block));
+    } else {
+        const auto& row_ids = _row_part_tablet_ids[0].row_ids;
+        const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
+        for (int idx = 0; idx < row_ids.size(); ++idx) {
+            const auto& row = row_ids[idx];
+            const auto& tablet_id_hash =
+                    HashUtil::zlib_crc_hash(&tablet_ids[idx], 
sizeof(HashValType), 0);
+            _hash_vals[row] = tablet_id_hash % _partition_count;
+        }
+    }
+
+    return Status::OK();
+}
+
+ChannelField TabletSinkHashPartitioner::get_channel_ids() const {
+    return {_hash_vals.data(), sizeof(HashValType)};
+}
+
+Status TabletSinkHashPartitioner::clone(RuntimeState* state,
+                                        std::unique_ptr<PartitionerBase>& 
partitioner) {
+    partitioner.reset(new TabletSinkHashPartitioner(_partition_count, _txn_id, 
_tablet_sink_schema,
+                                                    _tablet_sink_partition, 
_tablet_sink_location,
+                                                    _tablet_sink_tuple_id, 
_local_state));
+    return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::close(RuntimeState* state) {
+    if (_block_convertor != nullptr && _tablet_finder != nullptr) {
+        
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
+                                             
_tablet_finder->num_filtered_rows());
+        state->update_num_rows_load_unselected(
+                _tablet_finder->num_immutable_partition_filtered_rows());
+        // sink won't see those filtered rows, we should compensate here
+        state->set_num_rows_load_total(state->num_rows_load_filtered() +
+                                       state->num_rows_load_unselected());
+    }
+    return Status::OK();
+}
+
+Status TabletSinkHashPartitioner::_send_new_partition_batch(RuntimeState* 
state,
+                                                            vectorized::Block* 
input_block) const {
+    RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+    auto& p = _local_state->parent()->cast<pipeline::ExchangeSinkOperatorX>();
+    // Recovery back
+    _row_distribution.clear_batching_stats();
+    _row_distribution._batching_block->clear_column_data();
+    _row_distribution._deal_batched = false;
+    RETURN_IF_ERROR(p.sink(state, input_block, false));
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/tablet_sink_hash_partitioner.h 
b/be/src/vec/sink/tablet_sink_hash_partitioner.h
new file mode 100644
index 00000000000..dbbc7c88926
--- /dev/null
+++ b/be/src/vec/sink/tablet_sink_hash_partitioner.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/tablet_info.h"
+#include "pipeline/exec/exchange_sink_operator.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/runtime/partitioner.h"
+#include "vec/sink/vrow_distribution.h"
+#include "vec/sink/vtablet_block_convertor.h"
+#include "vec/sink/vtablet_finder.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+class TabletSinkHashPartitioner final : public PartitionerBase {
+public:
+    using HashValType = int64_t;
+    TabletSinkHashPartitioner(size_t partition_count, int64_t txn_id,
+                              const TOlapTableSchemaParam& tablet_sink_schema,
+                              const TOlapTablePartitionParam& 
tablet_sink_partition,
+                              const TOlapTableLocationParam& 
tablet_sink_location,
+                              const TTupleId& tablet_sink_tuple_id,
+                              pipeline::ExchangeSinkLocalState* local_state);
+
+    ~TabletSinkHashPartitioner() override = default;
+
+    Status init(const std::vector<TExpr>& texprs) override;
+
+    Status prepare(RuntimeState* state, const RowDescriptor& row_desc) 
override;
+
+    Status open(RuntimeState* state) override;
+
+    Status do_partitioning(RuntimeState* state, Block* block) const override;
+
+    ChannelField get_channel_ids() const override;
+    Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& 
partitioner) override;
+
+    Status close(RuntimeState* state) override;
+
+private:
+    static Status empty_callback_function(void* sender, 
TCreatePartitionResult* result) {
+        return Status::OK();
+    }
+
+    Status _send_new_partition_batch(RuntimeState* state, vectorized::Block* 
input_block) const;
+
+    const int64_t _txn_id = -1;
+    const TOlapTableSchemaParam _tablet_sink_schema;
+    const TOlapTablePartitionParam _tablet_sink_partition;
+    const TOlapTableLocationParam _tablet_sink_location;
+    const TTupleId _tablet_sink_tuple_id;
+    mutable pipeline::ExchangeSinkLocalState* _local_state;
+    mutable OlapTableLocationParam* _location = nullptr;
+    mutable vectorized::VRowDistribution _row_distribution;
+    mutable vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
+    mutable std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
+    mutable std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = 
nullptr;
+    mutable std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
+    mutable std::unique_ptr<vectorized::OlapTableBlockConvertor> 
_block_convertor = nullptr;
+    mutable TupleDescriptor* _tablet_sink_tuple_desc = nullptr;
+    mutable RowDescriptor* _tablet_sink_row_desc = nullptr;
+    mutable std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids;
+    mutable std::vector<HashValType> _hash_vals;
+};
+#include "common/compile_check_end.h"
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 66aacc59f6c..abed6133458 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -239,18 +239,18 @@ 
BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool
         : _parent(parent), _is_local(is_local), 
_batch_size(parent->state()->batch_size()) {}
 
 Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, 
size_t num_receivers,
-                                              bool* serialized, bool eos,
-                                              const std::vector<uint32_t>* 
rows) {
+                                              bool* serialized, bool eos, 
const uint32_t* data,
+                                              const uint32_t offset, const 
uint32_t size) {
     if (_mutable_block == nullptr) {
         _mutable_block = MutableBlock::create_unique(block->clone_empty());
     }
 
     {
         SCOPED_TIMER(_parent->merge_block_timer());
-        if (rows) {
-            if (!rows->empty()) {
-                const auto* begin = rows->data();
-                RETURN_IF_ERROR(_mutable_block->add_rows(block, begin, begin + 
rows->size()));
+        if (data) {
+            if (size > 0) {
+                RETURN_IF_ERROR(
+                        _mutable_block->add_rows(block, data + offset, data + 
offset + size));
             }
         } else if (!block->empty()) {
             RETURN_IF_ERROR(_mutable_block->merge(*block));
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 16ea49e443c..0ff1f252d54 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -80,7 +80,8 @@ public:
     BlockSerializer() : _batch_size(0) {};
 #endif
     Status next_serialized_block(Block* src, PBlock* dest, size_t 
num_receivers, bool* serialized,
-                                 bool eos, const std::vector<uint32_t>* rows = 
nullptr);
+                                 bool eos, const uint32_t* data = nullptr,
+                                 const uint32_t offset = 0, const uint32_t 
size = 0);
     Status serialize_block(PBlock* dest, size_t num_receivers = 1);
     Status serialize_block(const Block* src, PBlock* dest, size_t 
num_receivers = 1);
 
@@ -150,7 +151,8 @@ public:
     Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = 
false);
     Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos = false);
 
-    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
{
+    Status add_rows(Block* block, const uint32_t* data, const uint32_t offset, 
const uint32_t size,
+                    bool eos) {
         if (_fragment_instance_id.lo == -1) {
             return Status::OK();
         }
@@ -160,7 +162,7 @@ public:
             _pblock = std::make_unique<PBlock>();
         }
         RETURN_IF_ERROR(_serializer.next_serialized_block(block, 
_pblock.get(), 1, &serialized, eos,
-                                                          &rows));
+                                                          data, offset, size));
         if (serialized) {
             RETURN_IF_ERROR(_send_current_block(eos));
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to