This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new ef2151ae667 [Feature-WIP](multi-catalog) Add Hive sink on BE side. 
(#32306) (#32364)
ef2151ae667 is described below

commit ef2151ae667a923cae8c28d82be50d377430c3e0
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Mon Mar 18 11:23:01 2024 +0800

    [Feature-WIP](multi-catalog) Add Hive sink on BE side. (#32306) (#32364)
    
    bp #32306
    Co-authored-by: Qi Chen <kaka11.c...@gmail.com>
---
 be/src/common/config.cpp                           |  15 +
 be/src/common/config.h                             |  13 +
 be/src/exec/data_sink.cpp                          |  16 +
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  73 +++-
 be/src/pipeline/exec/exchange_sink_operator.h      |  28 ++
 be/src/pipeline/exec/hive_table_sink_operator.cpp  |  50 +++
 be/src/pipeline/exec/hive_table_sink_operator.h    | 115 ++++++
 be/src/pipeline/pipeline_fragment_context.cpp      |  11 +-
 be/src/pipeline/pipeline_x/operator.cpp            |   3 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |   9 +
 be/src/runtime/fragment_mgr.cpp                    |  18 +
 be/src/runtime/runtime_state.h                     |   4 +
 be/src/util/indexed_priority_queue.hpp             | 182 +++++++++
 be/src/vec/exec/skewed_partition_rebalancer.cpp    | 302 ++++++++++++++
 be/src/vec/exec/skewed_partition_rebalancer.h      | 132 +++++++
 be/src/vec/runtime/vorc_transformer.cpp            |  23 +-
 be/src/vec/runtime/vorc_transformer.h              |   6 +-
 .../sink/scale_writer_partitioning_exchanger.hpp   |  92 +++++
 be/src/vec/sink/vhive_table_sink.cpp               |  48 +++
 be/src/vec/sink/vhive_table_sink.h                 |  52 +++
 be/src/vec/sink/writer/vhive_partition_writer.cpp  | 282 ++++++++++++++
 be/src/vec/sink/writer/vhive_partition_writer.h    |  99 +++++
 be/src/vec/sink/writer/vhive_table_writer.cpp      | 432 +++++++++++++++++++++
 be/src/vec/sink/writer/vhive_table_writer.h        |  74 ++++
 be/src/vec/sink/writer/vhive_utils.cpp             |  78 ++++
 be/src/vec/sink/writer/vhive_utils.h               |  45 +++
 be/test/util/indexed_priority_queue_test.cpp       | 104 +++++
 .../vec/exec/skewed_partition_rebalancer_test.cpp  | 318 +++++++++++++++
 be/test/vec/exec/vhive_utils_test.cpp              |  70 ++++
 29 files changed, 2681 insertions(+), 13 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index dc31d2c621e..960d8f1c19a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1197,6 +1197,21 @@ DEFINE_mString(ca_cert_file_paths,
                
"/etc/pki/tls/certs/ca-bundle.crt;/etc/ssl/certs/ca-certificates.crt;"
                "/etc/ssl/ca-bundle.pem");
 
+/** Table sink configurations(currently contains only external table types) **/
+// Minimum data processed to scale writers when non partition writing
+DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold,
+              "125829120"); // 120MB
+// Minimum data processed to start rebalancing in exchange when partition 
writing
+DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, 
"209715200"); // 200MB
+// Minimum data processed to trigger skewed partition rebalancing in exchange 
when partition writing
+DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold,
+              "209715200"); // 200MB
+// Maximum processed partition nums of per writer when partition writing
+DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128");
+
+/** Hive sink configurations **/
+DEFINE_mInt64(hive_sink_max_file_size, "1073741824"); // 1GB
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index abb833f42e8..85db11b9e61 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1271,6 +1271,19 @@ DECLARE_String(tmp_file_dir);
 // the file paths(one or more) of CA cert, splite using ";" aws s3 lib use it 
to init s3client
 DECLARE_mString(ca_cert_file_paths);
 
+/** Table sink configurations(currently contains only external table types) **/
+// Minimum data processed to scale writers when non partition writing
+DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold);
+// Minimum data processed to start rebalancing in exchange when partition 
writing
+DECLARE_mInt64(table_sink_partition_write_data_processed_threshold);
+// Minimum data processed to trigger skewed partition rebalancing in exchange 
when partition writing
+DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold);
+// Maximum processed partition nums of per writer when partition writing
+DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer);
+
+/** Hive sink configurations **/
+DECLARE_mInt64(hive_sink_max_file_size); // 1GB
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 5047ae8fc78..d373ddd2fcd 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -36,6 +36,7 @@
 #include "vec/sink/group_commit_block_sink.h"
 #include "vec/sink/multi_cast_data_stream_sink.h"
 #include "vec/sink/vdata_stream_sender.h"
+#include "vec/sink/vhive_table_sink.h"
 #include "vec/sink/vmemory_scratch_sink.h"
 #include "vec/sink/volap_table_sink.h"
 #include "vec/sink/volap_table_sink_v2.h"
@@ -157,6 +158,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         }
         break;
     }
+    case TDataSinkType::HIVE_TABLE_SINK: {
+        if (!thrift_sink.__isset.hive_table_sink) {
+            return Status::InternalError("Missing hive table sink.");
+        }
+        sink->reset(new vectorized::VHiveTableSink(pool, row_desc, 
output_exprs));
+        break;
+    }
     case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
         Status status = Status::OK();
         DCHECK(thrift_sink.__isset.olap_table_sink);
@@ -298,6 +306,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         }
         break;
     }
+    case TDataSinkType::HIVE_TABLE_SINK: {
+        if (!thrift_sink.__isset.hive_table_sink) {
+            return Status::InternalError("Missing hive table sink.");
+        }
+        sink->reset(new vectorized::VHiveTableSink(pool, row_desc, 
output_exprs));
+        break;
+    }
     case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
         DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
         DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
@@ -313,6 +328,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         RETURN_IF_ERROR(status);
         break;
     }
+
     default: {
         std::stringstream error_msg;
         std::map<int, const char*>::const_iterator i =
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 333abdbcc01..9c3d68c472f 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -157,7 +157,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
             local_size++;
         }
     }
-    if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM) {
+    if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM ||
+        _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
         std::random_device rd;
         std::mt19937 g(rd());
         shuffle(channels.begin(), channels.end(), g);
@@ -249,6 +250,27 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
                  .schema = _schema,
                  .caller = (void*)this,
                  .create_partition_callback = 
&ExchangeSinkLocalState::empty_callback_function});
+    } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+        _partition_count =
+                channels.size() * 
config::table_sink_partition_write_max_partition_nums_per_writer;
+        _partitioner.reset(
+                new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_partition_count));
+        _partition_function.reset(new 
HashPartitionFunction(_partitioner.get()));
+        //        const long MEGABYTE = 1024 * 1024;
+        //        const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD 
= 10000 * MEGABYTE; // 1MB
+        //        const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 * 
MEGABYTE;           // 50MB
+
+        //        const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD 
= 1; // 1MB
+        //        const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1;       
    // 50MB
+        scale_writer_partitioning_exchanger.reset(new 
vectorized::ScaleWriterPartitioningExchanger<
+                                                  HashPartitionFunction>(
+                channels.size(), *_partition_function, _partition_count, 
channels.size(), 1,
+                config::table_sink_partition_write_data_processed_threshold,
+                
config::table_sink_partition_write_skewed_data_processed_rebalance_threshold));
+        RETURN_IF_ERROR(_partitioner->init(p._texprs));
+        RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+        _profile->add_info_string("Partitioner",
+                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
     }
 
     _finish_dependency->block();
@@ -259,7 +281,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
 Status ExchangeSinkLocalState::open(RuntimeState* state) {
     RETURN_IF_ERROR(Base::open(state));
     if (_part_type == TPartitionType::HASH_PARTITIONED ||
-        _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+        _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
+        _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
         RETURN_IF_ERROR(_partitioner->open(state));
     } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
         RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
@@ -320,7 +343,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
            sink.output_partition.type == TPartitionType::RANDOM ||
            sink.output_partition.type == TPartitionType::RANGE_PARTITIONED ||
            sink.output_partition.type == 
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED ||
-           sink.output_partition.type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED);
+           sink.output_partition.type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
+           sink.output_partition.type == 
TPartitionType::TABLE_SINK_HASH_PARTITIONED ||
+           sink.output_partition.type == 
TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
     _name = "ExchangeSinkOperatorX";
     _pool = std::make_shared<ObjectPool>();
 }
@@ -492,7 +517,47 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
         RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, 
num_channels,
                                                   channel2rows, 
convert_block.get(), eos));
+    } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+        {
+            SCOPED_TIMER(local_state._split_block_hash_compute_timer);
+            RETURN_IF_ERROR(
+                    local_state._partitioner->do_partitioning(state, block, 
_mem_tracker.get()));
+        }
+        std::vector<std::vector<uint32>> assignments =
+                local_state.scale_writer_partitioning_exchanger->accept(block);
+        RETURN_IF_ERROR(channel_add_rows_with_idx(
+                state, local_state.channels, local_state.channels.size(), 
assignments, block, eos));
 
+    } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
+        // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
+        // 1. select channel
+        vectorized::PipChannel<ExchangeSinkLocalState>* current_channel =
+                local_state.channels[local_state.current_channel_idx];
+        if (!current_channel->is_receiver_eof()) {
+            // 2. serialize, send and rollover block
+            if (current_channel->is_local()) {
+                auto status = current_channel->send_local_block(block);
+                HANDLE_CHANNEL_STATUS(state, current_channel, status);
+            } else {
+                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                RETURN_IF_ERROR(local_state._serializer.serialize_block(
+                        block, current_channel->ch_cur_pb_block()));
+                auto status =
+                        
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+                HANDLE_CHANNEL_STATUS(state, current_channel, status);
+                current_channel->ch_roll_pb_block();
+            }
+            _data_processed += block->bytes();
+        }
+
+        if (_writer_count < local_state.channels.size()) {
+            if (_data_processed >=
+                _writer_count *
+                        
config::table_sink_non_partition_write_scaling_data_processed_threshold) {
+                _writer_count++;
+            }
+        }
+        local_state.current_channel_idx = (local_state.current_channel_idx + 
1) % _writer_count;
     } else {
         // Range partition
         // 1. calculate range
@@ -581,7 +646,6 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
             channel2rows[i].clear();
         }
     }
-
     if (eos) {
         for (int i = 0; i < num_channels; ++i) {
             if (!channels[i]->is_receiver_eof()) {
@@ -590,7 +654,6 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
             }
         }
     }
-
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 9384adbe5f3..17878fc6ead 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -25,6 +25,7 @@
 #include "exchange_sink_buffer.h"
 #include "operator.h"
 #include "pipeline/pipeline_x/operator.h"
+#include "vec/sink/scale_writer_partitioning_exchanger.hpp"
 #include "vec/sink/vdata_stream_sender.h"
 
 namespace doris {
@@ -68,6 +69,21 @@ class ExchangeSinkLocalState final : public 
PipelineXSinkLocalState<> {
     ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
     using Base = PipelineXSinkLocalState<>;
 
+private:
+    class HashPartitionFunction {
+    public:
+        HashPartitionFunction(vectorized::PartitionerBase* partitioner)
+                : _partitioner(partitioner) {}
+
+        int get_partition(vectorized::Block* block, int position) {
+            uint32_t* partition_ids = 
(uint32_t*)_partitioner->get_channel_ids();
+            return partition_ids[position];
+        }
+
+    private:
+        vectorized::PartitionerBase* _partitioner;
+    };
+
 public:
     ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state),
@@ -132,6 +148,10 @@ public:
     int current_channel_idx; // index of current channel to send to if _random 
== true
     bool only_local_exchange;
 
+    // for external table sink hash partition
+    
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
+            scale_writer_partitioning_exchanger;
+
 private:
     friend class ExchangeSinkOperatorX;
     friend class vectorized::Channel<ExchangeSinkLocalState>;
@@ -209,6 +229,9 @@ private:
     std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids;
     int64_t _number_input_rows = 0;
     TPartitionType::type _part_type;
+
+    // for external table sink hash partition
+    std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
 };
 
 class ExchangeSinkOperatorX final : public 
DataSinkOperatorX<ExchangeSinkLocalState> {
@@ -274,6 +297,11 @@ private:
     const TTupleId& _tablet_sink_tuple_id;
     int64_t _tablet_sink_txn_id = -1;
     std::shared_ptr<ObjectPool> _pool;
+
+    // for external table sink random partition
+    // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
+    size_t _data_processed = 0;
+    int _writer_count = 1;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.cpp 
b/be/src/pipeline/exec/hive_table_sink_operator.cpp
new file mode 100644
index 00000000000..6b8eaa8c91e
--- /dev/null
+++ b/be/src/pipeline/exec/hive_table_sink_operator.cpp
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "hive_table_sink_operator.h"
+
+#include "common/status.h"
+
+namespace doris::pipeline {
+
+OperatorPtr HiveTableSinkOperatorBuilder::build_operator() {
+    return std::make_shared<HiveTableSinkOperator>(this, _sink);
+}
+
+Status HiveTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    auto& p = _parent->cast<Parent>();
+    RETURN_IF_ERROR(_writer->init_properties(p._pool));
+    return Status::OK();
+}
+
+Status HiveTableSinkLocalState::close(RuntimeState* state, Status exec_status) 
{
+    if (Base::_closed) {
+        return Status::OK();
+    }
+    SCOPED_TIMER(_close_timer);
+    SCOPED_TIMER(exec_time_counter());
+    if (_closed) {
+        return _close_status;
+    }
+    _close_status = Base::close(state, exec_status);
+    return _close_status;
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.h 
b/be/src/pipeline/exec/hive_table_sink_operator.h
new file mode 100644
index 00000000000..39b5df36567
--- /dev/null
+++ b/be/src/pipeline/exec/hive_table_sink_operator.h
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
+#include "vec/sink/vhive_table_sink.h"
+
+namespace doris {
+
+namespace pipeline {
+
+class HiveTableSinkOperatorBuilder final
+        : public DataSinkOperatorBuilder<vectorized::VHiveTableSink> {
+public:
+    HiveTableSinkOperatorBuilder(int32_t id, DataSink* sink)
+            : DataSinkOperatorBuilder(id, "HiveTableSinkOperator", sink) {}
+
+    OperatorPtr build_operator() override;
+};
+
+class HiveTableSinkOperator final : public 
DataSinkOperator<vectorized::VHiveTableSink> {
+public:
+    HiveTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* 
sink)
+            : DataSinkOperator(operator_builder, sink) {}
+
+    bool can_write() override { return _sink->can_write(); }
+};
+
+class HiveTableSinkOperatorX;
+
+class HiveTableSinkLocalState final
+        : public AsyncWriterSink<vectorized::VHiveTableWriter, 
HiveTableSinkOperatorX> {
+public:
+    using Base = AsyncWriterSink<vectorized::VHiveTableWriter, 
HiveTableSinkOperatorX>;
+    using Parent = HiveTableSinkOperatorX;
+    ENABLE_FACTORY_CREATOR(HiveTableSinkLocalState);
+    HiveTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state) {};
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override {
+        SCOPED_TIMER(exec_time_counter());
+        SCOPED_TIMER(_open_timer);
+        return Base::open(state);
+    }
+
+    Status close(RuntimeState* state, Status exec_status) override;
+    friend class HiveTableSinkOperatorX;
+
+private:
+    Status _close_status = Status::OK();
+};
+
+class HiveTableSinkOperatorX final : public 
DataSinkOperatorX<HiveTableSinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<HiveTableSinkLocalState>;
+    HiveTableSinkOperatorX(ObjectPool* pool, int operator_id, const 
RowDescriptor& row_desc,
+                           const std::vector<TExpr>& t_output_expr)
+            : Base(operator_id, 0),
+              _row_desc(row_desc),
+              _t_output_expr(t_output_expr),
+              _pool(pool) {};
+
+    Status init(const TDataSink& thrift_sink) override {
+        RETURN_IF_ERROR(Base::init(thrift_sink));
+        // From the thrift expressions create the real exprs.
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
+        return Status::OK();
+    }
+
+    Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::prepare(state));
+        return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc);
+    }
+
+    Status open(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::open(state));
+        return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+    }
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override {
+        auto& local_state = get_local_state(state);
+        SCOPED_TIMER(local_state.exec_time_counter());
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+        return local_state.sink(state, in_block, eos);
+    }
+
+private:
+    friend class HiveTableSinkLocalState;
+    template <typename Writer, typename Parent>
+        requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
+    friend class AsyncWriterSink;
+    const RowDescriptor& _row_desc;
+    vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+    const std::vector<TExpr>& _t_output_expr;
+    ObjectPool* _pool = nullptr;
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index d1800299559..c0e075d2bfa 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -59,6 +59,7 @@
 #include "pipeline/exec/group_commit_block_sink_operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/hive_table_sink_operator.h"
 #include "pipeline/exec/multi_cast_data_stream_sink.h"
 #include "pipeline/exec/multi_cast_data_stream_source.h"
 #include "pipeline/exec/mysql_scan_operator.h" // IWYU pragma: keep
@@ -821,12 +822,14 @@ Status PipelineFragmentContext::_create_sink(int 
sender_id, const TDataSink& thr
                                                                       
_sink.get());
         break;
     }
-    case TDataSinkType::MYSQL_TABLE_SINK:
-    case TDataSinkType::JDBC_TABLE_SINK:
-    case TDataSinkType::ODBC_TABLE_SINK: {
-        sink_ = 
std::make_shared<TableSinkOperatorBuilder>(next_operator_builder_id(), 
_sink.get());
+    case TDataSinkType::HIVE_TABLE_SINK: {
+        sink_ = 
std::make_shared<HiveTableSinkOperatorBuilder>(next_operator_builder_id(),
+                                                               _sink.get());
         break;
     }
+    case TDataSinkType::MYSQL_TABLE_SINK:
+    case TDataSinkType::JDBC_TABLE_SINK:
+    case TDataSinkType::ODBC_TABLE_SINK:
     case TDataSinkType::RESULT_FILE_SINK: {
         sink_ = std::make_shared<ResultFileSinkOperatorBuilder>(
                 thrift_sink.result_file_sink.dest_node_id, _sink.get());
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 0c890b83041..f9a30c7e3ac 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -37,6 +37,7 @@
 #include "pipeline/exec/file_scan_operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/hive_table_sink_operator.h"
 #include "pipeline/exec/jdbc_scan_operator.h"
 #include "pipeline/exec/jdbc_table_sink_operator.h"
 #include "pipeline/exec/meta_scan_operator.h"
@@ -541,6 +542,7 @@ DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
 DECLARE_OPERATOR_X(ResultFileSinkLocalState)
 DECLARE_OPERATOR_X(OlapTableSinkLocalState)
 DECLARE_OPERATOR_X(OlapTableSinkV2LocalState)
+DECLARE_OPERATOR_X(HiveTableSinkLocalState)
 DECLARE_OPERATOR_X(AnalyticSinkLocalState)
 DECLARE_OPERATOR_X(SortSinkLocalState)
 DECLARE_OPERATOR_X(SpillSortSinkLocalState)
@@ -637,5 +639,6 @@ template class 
AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileS
 template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VTabletWriter, 
OlapTableSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, 
OlapTableSinkV2OperatorX>;
+template class AsyncWriterSink<doris::vectorized::VHiveTableWriter, 
HiveTableSinkOperatorX>;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 58fc3cb843b..13ad0789e31 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -51,6 +51,7 @@
 #include "pipeline/exec/file_scan_operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/hive_table_sink_operator.h"
 #include "pipeline/exec/jdbc_scan_operator.h"
 #include "pipeline/exec/jdbc_table_sink_operator.h"
 #include "pipeline/exec/meta_scan_operator.h"
@@ -372,6 +373,14 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         }
         break;
     }
+    case TDataSinkType::HIVE_TABLE_SINK: {
+        if (!thrift_sink.__isset.hive_table_sink) {
+            return Status::InternalError("Missing hive table sink.");
+        }
+        _sink.reset(
+                new HiveTableSinkOperatorX(pool, next_sink_operator_id(), 
row_desc, output_exprs));
+        break;
+    }
     case TDataSinkType::JDBC_TABLE_SINK: {
         if (!thrift_sink.__isset.jdbc_table_sink) {
             return Status::InternalError("Missing data jdbc sink.");
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f6552f67fca..4657c341e81 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -373,6 +373,24 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
             }
         }
 
+        if (!req.runtime_state->hive_partition_updates().empty()) {
+            params.__isset.hive_partition_updates = true;
+            params.hive_partition_updates.reserve(
+                    req.runtime_state->hive_partition_updates().size());
+            for (auto& hive_partition_update : 
req.runtime_state->hive_partition_updates()) {
+                params.hive_partition_updates.push_back(hive_partition_update);
+            }
+        } else if (!req.runtime_states.empty()) {
+            for (auto* rs : req.runtime_states) {
+                if (!rs->hive_partition_updates().empty()) {
+                    params.__isset.hive_partition_updates = true;
+                    
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
+                                                         
rs->hive_partition_updates().begin(),
+                                                         
rs->hive_partition_updates().end());
+                }
+            }
+        }
+
         // Send new errors to coordinator
         req.runtime_state->get_unreported_errors(&(params.error_log));
         params.__isset.error_log = (params.error_log.size() > 0);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e14721d4859..d031f36fbed 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -452,6 +452,8 @@ public:
 
     std::vector<TTabletCommitInfo>& tablet_commit_infos() { return 
_tablet_commit_infos; }
 
+    std::vector<THivePartitionUpdate>& hive_partition_updates() { return 
_hive_partition_updates; }
+
     const std::vector<TErrorTabletInfo>& error_tablet_infos() const { return 
_error_tablet_infos; }
 
     std::vector<TErrorTabletInfo>& error_tablet_infos() { return 
_error_tablet_infos; }
@@ -726,6 +728,8 @@ private:
     int _max_operator_id = 0;
     int _task_id = -1;
 
+    std::vector<THivePartitionUpdate> _hive_partition_updates;
+
     std::vector<std::unique_ptr<doris::pipeline::PipelineXLocalStateBase>> 
_op_id_to_local_state;
 
     std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> 
_sink_local_state;
diff --git a/be/src/util/indexed_priority_queue.hpp 
b/be/src/util/indexed_priority_queue.hpp
new file mode 100644
index 00000000000..5869d575231
--- /dev/null
+++ b/be/src/util/indexed_priority_queue.hpp
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
+// to cpp and modified by Doris
+
+#pragma once
+
+#pragma once
+
+#include <functional>
+#include <iostream>
+#include <map>
+#include <optional>
+#include <set>
+
+/**
+ * A priority queue with constant time contains(E) and log time remove(E)
+ * Ties are broken by insertion order.
+ * LOW_TO_HIGH is the priority order from low to high,
+ * HIGH_TO_LOW is the priority order from high to low.
+ * Those with the same priority are arranged in order of insertion.
+ */
+
+namespace doris {
+
+template <typename T>
+struct IndexedPriorityQueueEntry {
+    T value;
+    long priority;
+    long generation;
+
+    IndexedPriorityQueueEntry(T val, long prio, long gen)
+            : value(std::move(val)), priority(prio), generation(gen) {}
+};
+
+enum class IndexedPriorityQueuePriorityOrdering { LOW_TO_HIGH, HIGH_TO_LOW };
+
+template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering>
+struct IndexedPriorityQueueComparator {
+    bool operator()(const IndexedPriorityQueueEntry<T>& lhs,
+                    const IndexedPriorityQueueEntry<T>& rhs) const {
+        if constexpr (priority_ordering == 
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
+            if (lhs.priority != rhs.priority) {
+                return lhs.priority < rhs.priority;
+            }
+            return lhs.generation < rhs.generation;
+        } else {
+            if (lhs.priority != rhs.priority) {
+                return lhs.priority > rhs.priority;
+            }
+            return lhs.generation < rhs.generation;
+        }
+    }
+};
+
+template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering =
+                              
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>
+class IndexedPriorityQueue {
+public:
+    struct Prioritized {
+        T value;
+        long priority;
+    };
+
+    IndexedPriorityQueue() = default;
+
+    bool add_or_update(T element, long priority) {
+        auto it = _index.find(element);
+        if (it != _index.end()) {
+            if (it->second.priority == priority) {
+                return false;
+            }
+            _queue.erase(it->second);
+        }
+        IndexedPriorityQueueEntry<T> entry {std::move(element), priority, 
generation++};
+        _queue.insert(std::move(entry));
+        _index.insert({entry.value, std::move(entry)});
+        return true;
+    }
+
+    bool contains(const T& element) const { return _index.find(element) != 
_index.end(); }
+
+    bool remove(const T& element) {
+        auto it = _index.find(element);
+        if (it != _index.end()) {
+            _queue.erase(it->second);
+            _index.erase(it);
+            return true;
+        }
+        return false;
+    }
+
+    std::optional<T> poll() {
+        if (_queue.empty()) {
+            return std::nullopt;
+        }
+        T value = _queue.begin()->value;
+        _index.erase(value);
+        _queue.erase(_queue.begin());
+        return value;
+    }
+
+    std::optional<Prioritized> peek() const {
+        if (_queue.empty()) {
+            return std::nullopt;
+        }
+        const IndexedPriorityQueueEntry<T>& entry = *_queue.begin();
+        return Prioritized {entry.value, entry.priority};
+    }
+
+    int size() const { return _queue.size(); }
+
+    bool is_empty() const { return _queue.empty(); }
+
+    class Iterator {
+    public:
+        using iterator_category = std::forward_iterator_tag;
+        using value_type = T;
+        using difference_type = std::ptrdiff_t;
+        using pointer = T*;
+        using reference = T&;
+
+        Iterator() : _iter() {}
+        explicit Iterator(
+                typename std::set<
+                        IndexedPriorityQueueEntry<T>,
+                        IndexedPriorityQueueComparator<T, 
priority_ordering>>::const_iterator iter)
+                : _iter(iter) {}
+
+        const T& operator*() const { return _iter->value; }
+
+        const T* operator->() const { return &(_iter->value); }
+
+        Iterator& operator++() {
+            ++_iter;
+            return *this;
+        }
+
+        Iterator operator++(int) {
+            Iterator tmp = *this;
+            ++(*this);
+            return tmp;
+        }
+
+        bool operator==(const Iterator& other) const { return _iter == 
other._iter; }
+
+        bool operator!=(const Iterator& other) const { return !(*this == 
other); }
+
+    private:
+        typename std::set<IndexedPriorityQueueEntry<T>,
+                          IndexedPriorityQueueComparator<T, 
priority_ordering>>::const_iterator
+                _iter;
+    };
+
+    Iterator begin() const { return Iterator(_queue.begin()); }
+
+    Iterator end() const { return Iterator(_queue.end()); }
+
+private:
+    std::map<T, IndexedPriorityQueueEntry<T>> _index;
+    std::set<IndexedPriorityQueueEntry<T>, IndexedPriorityQueueComparator<T, 
priority_ordering>>
+            _queue;
+
+    long generation = 0;
+};
+
+} // namespace doris
diff --git a/be/src/vec/exec/skewed_partition_rebalancer.cpp 
b/be/src/vec/exec/skewed_partition_rebalancer.cpp
new file mode 100644
index 00000000000..ae12d365f05
--- /dev/null
+++ b/be/src/vec/exec/skewed_partition_rebalancer.cpp
@@ -0,0 +1,302 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
+// to cpp and modified by Doris
+
+#include "vec/exec/skewed_partition_rebalancer.h"
+
+#include <cmath>
+#include <list>
+
+namespace doris::vectorized {
+
+SkewedPartitionRebalancer::SkewedPartitionRebalancer(
+        int partition_count, int task_count, int task_bucket_count,
+        long min_partition_data_processed_rebalance_threshold,
+        long min_data_processed_rebalance_threshold)
+        : _partition_count(partition_count),
+          _task_count(task_count),
+          _task_bucket_count(task_bucket_count),
+          _min_partition_data_processed_rebalance_threshold(
+                  min_partition_data_processed_rebalance_threshold),
+          _min_data_processed_rebalance_threshold(
+                  std::max(min_partition_data_processed_rebalance_threshold,
+                           min_data_processed_rebalance_threshold)),
+          _partition_row_count(partition_count, 0),
+          _data_processed(0),
+          _data_processed_at_last_rebalance(0),
+          _partition_data_size(partition_count, 0),
+          _partition_data_size_at_last_rebalance(partition_count, 0),
+          _partition_data_size_since_last_rebalance_per_task(partition_count, 
0),
+          _estimated_task_bucket_data_size_since_last_rebalance(task_count * 
task_bucket_count, 0),
+          _partition_assignments(partition_count) {
+    std::vector<int> task_bucket_ids(task_count, 0);
+
+    for (int partition = 0; partition < partition_count; partition++) {
+        int task_id = partition % task_count;
+        int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count;
+        TaskBucket task_bucket(task_id, bucket_id, task_bucket_count);
+        _partition_assignments[partition].emplace_back(std::move(task_bucket));
+    }
+}
+
+std::vector<std::list<int>> 
SkewedPartitionRebalancer::get_partition_assignments() {
+    std::vector<std::list<int>> assigned_tasks;
+
+    for (const auto& partition_assignment : _partition_assignments) {
+        std::list<int> tasks;
+        std::transform(partition_assignment.begin(), 
partition_assignment.end(),
+                       std::back_inserter(tasks),
+                       [](const TaskBucket& task_bucket) { return 
task_bucket.task_id; });
+        assigned_tasks.push_back(tasks);
+    }
+
+    return assigned_tasks;
+}
+
+int SkewedPartitionRebalancer::get_task_count() {
+    return _task_count;
+}
+
+int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) {
+    const std::vector<TaskBucket>& task_ids = 
_partition_assignments[partition_id];
+
+    int task_id_index = (index % task_ids.size() + task_ids.size()) % 
task_ids.size();
+
+    return task_ids[task_id_index].task_id;
+}
+
+void SkewedPartitionRebalancer::add_data_processed(long data_size) {
+    _data_processed += data_size;
+}
+
+void SkewedPartitionRebalancer::add_partition_row_count(int partition, long 
row_count) {
+    _partition_row_count[partition] += row_count;
+}
+
+void SkewedPartitionRebalancer::rebalance() {
+    long current_data_processed = _data_processed;
+    if (_should_rebalance(current_data_processed)) {
+        _rebalance_partitions(current_data_processed);
+    }
+}
+
+void SkewedPartitionRebalancer::_calculate_partition_data_size(long 
data_processed) {
+    long total_partition_row_count = 0;
+    for (int partition = 0; partition < _partition_count; partition++) {
+        total_partition_row_count += _partition_row_count[partition];
+    }
+
+    for (int partition = 0; partition < _partition_count; partition++) {
+        _partition_data_size[partition] = std::max(
+                (_partition_row_count[partition] * data_processed) / 
total_partition_row_count,
+                _partition_data_size[partition]);
+    }
+}
+
+long 
SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance(
+        IndexedPriorityQueue<int, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+                max_partitions) {
+    long estimated_data_size_since_last_rebalance = 0;
+    for (auto& elem : max_partitions) {
+        estimated_data_size_since_last_rebalance +=
+                _partition_data_size_since_last_rebalance_per_task[elem];
+    }
+    return estimated_data_size_since_last_rebalance;
+}
+
+void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness(
+        IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+                max_task_buckets,
+        IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+                min_task_buckets,
+        std::vector<IndexedPriorityQueue<int, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>&
+                task_bucket_max_partitions) {
+    std::vector<int> scaled_partitions;
+    while (true) {
+        std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll();
+        if (!max_task_bucket.has_value()) {
+            break;
+        }
+
+        IndexedPriorityQueue<int, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+                max_partitions = 
task_bucket_max_partitions[max_task_bucket->id];
+        if (max_partitions.is_empty()) {
+            continue;
+        }
+
+        std::vector<TaskBucket> min_skewed_task_buckets =
+                _find_skewed_min_task_buckets(max_task_bucket.value(), 
min_task_buckets);
+        if (min_skewed_task_buckets.empty()) {
+            break;
+        }
+
+        while (true) {
+            std::optional<int> max_partition = max_partitions.poll();
+            if (!max_partition.has_value()) {
+                break;
+            }
+            int max_partition_value = max_partition.value();
+
+            if (std::find(scaled_partitions.begin(), scaled_partitions.end(),
+                          max_partition_value) != scaled_partitions.end()) {
+                continue;
+            }
+
+            int total_assigned_tasks = 
_partition_assignments[max_partition_value].size();
+            if (_partition_data_size[max_partition_value] >=
+                (_min_partition_data_processed_rebalance_threshold * 
total_assigned_tasks)) {
+                for (const TaskBucket& min_task_bucket : 
min_skewed_task_buckets) {
+                    if (_rebalance_partition(max_partition_value, 
min_task_bucket, max_task_buckets,
+                                             min_task_buckets)) {
+                        scaled_partitions.push_back(max_partition_value);
+                        break;
+                    }
+                }
+            } else {
+                break;
+            }
+        }
+    }
+}
+
+std::vector<SkewedPartitionRebalancer::TaskBucket>
+SkewedPartitionRebalancer::_find_skewed_min_task_buckets(
+        const TaskBucket& max_task_bucket,
+        const IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+                min_task_buckets) {
+    std::vector<TaskBucket> min_skewed_task_buckets;
+
+    for (const auto& min_task_bucket : min_task_buckets) {
+        double skewness =
+                static_cast<double>(
+                        
_estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] -
+                        
_estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) /
+                
_estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id];
+        if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || 
std::isnan(skewness)) {
+            break;
+        }
+        if (max_task_bucket.task_id != min_task_bucket.task_id) {
+            min_skewed_task_buckets.push_back(min_task_bucket);
+        }
+    }
+    return min_skewed_task_buckets;
+}
+
+bool SkewedPartitionRebalancer::_rebalance_partition(
+        int partition_id, const TaskBucket& to_task_bucket,
+        IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+                max_task_buckets,
+        IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+                min_task_buckets) {
+    std::vector<TaskBucket>& assignments = 
_partition_assignments[partition_id];
+    if (std::any_of(assignments.begin(), assignments.end(),
+                    [&to_task_bucket](const TaskBucket& task_bucket) {
+                        return task_bucket.task_id == to_task_bucket.task_id;
+                    })) {
+        return false;
+    }
+
+    assignments.push_back(to_task_bucket);
+
+    int new_task_count = assignments.size();
+    int old_task_count = new_task_count - 1;
+    for (const TaskBucket& task_bucket : assignments) {
+        if (task_bucket == to_task_bucket) {
+            
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] +=
+                    
(_partition_data_size_since_last_rebalance_per_task[partition_id] *
+                     old_task_count) /
+                    new_task_count;
+        } else {
+            
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -=
+                    
_partition_data_size_since_last_rebalance_per_task[partition_id] /
+                    new_task_count;
+        }
+        max_task_buckets.add_or_update(
+                task_bucket, 
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]);
+        min_task_buckets.add_or_update(
+                task_bucket, 
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]);
+    }
+
+    return true;
+}
+
+bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) {
+    return (data_processed - _data_processed_at_last_rebalance) >=
+           _min_data_processed_rebalance_threshold;
+}
+
+void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) {
+    if (!_should_rebalance(data_processed)) {
+        return;
+    }
+
+    _calculate_partition_data_size(data_processed);
+
+    for (int partition = 0; partition < _partition_count; partition++) {
+        int total_assigned_tasks = _partition_assignments[partition].size();
+        long data_size = _partition_data_size[partition];
+        _partition_data_size_since_last_rebalance_per_task[partition] =
+                (data_size - 
_partition_data_size_at_last_rebalance[partition]) /
+                total_assigned_tasks;
+        _partition_data_size_at_last_rebalance[partition] = data_size;
+    }
+
+    std::vector<IndexedPriorityQueue<int, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>
+            task_bucket_max_partitions;
+
+    for (int i = 0; i < _task_count * _task_bucket_count; ++i) {
+        task_bucket_max_partitions.push_back(
+                IndexedPriorityQueue<int, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>());
+    }
+
+    for (int partition = 0; partition < _partition_count; partition++) {
+        auto& taskAssignments = _partition_assignments[partition];
+        for (const auto& taskBucket : taskAssignments) {
+            auto& queue = task_bucket_max_partitions[taskBucket.id];
+            queue.add_or_update(partition,
+                                
_partition_data_size_since_last_rebalance_per_task[partition]);
+        }
+    }
+
+    IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>
+            max_task_buckets;
+    IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>
+            min_task_buckets;
+
+    for (int taskId = 0; taskId < _task_count; taskId++) {
+        for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) {
+            TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count);
+            TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count);
+            
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] =
+                    _calculate_task_bucket_data_size_since_last_rebalance(
+                            task_bucket_max_partitions[task_bucket1.id]);
+            max_task_buckets.add_or_update(
+                    std::move(task_bucket1),
+                    
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]);
+            min_task_buckets.add_or_update(
+                    std::move(task_bucket2),
+                    
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket2.id]);
+        }
+    }
+
+    _rebalance_based_on_task_bucket_skewness(max_task_buckets, 
min_task_buckets,
+                                             task_bucket_max_partitions);
+    _data_processed_at_last_rebalance = data_processed;
+}
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/skewed_partition_rebalancer.h 
b/be/src/vec/exec/skewed_partition_rebalancer.h
new file mode 100644
index 00000000000..814ebc1d465
--- /dev/null
+++ b/be/src/vec/exec/skewed_partition_rebalancer.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
+// to cpp and modified by Doris
+
+/**
+ * Helps in distributing big or skewed partitions across available tasks to 
improve the performance of
+ * partitioned writes.
+ * <p>
+ * This rebalancer initialize a bunch of buckets for each task based on a 
given taskBucketCount and then tries to
+ * uniformly distribute partitions across those buckets. This helps to 
mitigate two problems:
+ * 1. Mitigate skewness across tasks.
+ * 2. Scale few big partitions across tasks even if there's no skewness among 
them. This will essentially speed the
+ *    local scaling without impacting much overall resource utilization.
+ * <p>
+ * Example:
+ * <p>
+ * Before: 3 tasks, 3 buckets per task, and 2 skewed partitions
+ * Task1                Task2               Task3
+ * Bucket1 (Part 1)     Bucket1 (Part 2)    Bucket1
+ * Bucket2              Bucket2             Bucket2
+ * Bucket3              Bucket3             Bucket3
+ * <p>
+ * After rebalancing:
+ * Task1                Task2               Task3
+ * Bucket1 (Part 1)     Bucket1 (Part 2)    Bucket1 (Part 1)
+ * Bucket2 (Part 2)     Bucket2 (Part 1)    Bucket2 (Part 2)
+ * Bucket3              Bucket3             Bucket3
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <iostream>
+#include <list>
+#include <optional>
+#include <vector>
+
+#include "util/indexed_priority_queue.hpp"
+
+namespace doris::vectorized {
+
+class SkewedPartitionRebalancer {
+private:
+    struct TaskBucket {
+        int task_id;
+        int id;
+
+        TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_)
+                : task_id(task_id_), id(task_id_ * task_bucket_count_ + 
bucket_id_) {}
+
+        bool operator==(const TaskBucket& other) const { return id == 
other.id; }
+
+        bool operator<(const TaskBucket& other) const { return id < other.id; }
+
+        bool operator>(const TaskBucket& other) const { return id > other.id; }
+    };
+
+public:
+    SkewedPartitionRebalancer(int partition_count, int task_count, int 
task_bucket_count,
+                              long 
min_partition_data_processed_rebalance_threshold,
+                              long min_data_processed_rebalance_threshold);
+
+    std::vector<std::list<int>> get_partition_assignments();
+    int get_task_count();
+    int get_task_id(int partition_id, int64_t index);
+    void add_data_processed(long data_size);
+    void add_partition_row_count(int partition, long row_count);
+    void rebalance();
+
+private:
+    void _calculate_partition_data_size(long data_processed);
+    long _calculate_task_bucket_data_size_since_last_rebalance(
+            IndexedPriorityQueue<int, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+                    max_partitions);
+    void _rebalance_based_on_task_bucket_skewness(
+            IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+                    max_task_buckets,
+            IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+                    min_task_buckets,
+            std::vector<
+                    IndexedPriorityQueue<int, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>&
+                    task_bucket_max_partitions);
+    std::vector<TaskBucket> _find_skewed_min_task_buckets(
+            const TaskBucket& max_task_bucket,
+            const IndexedPriorityQueue<TaskBucket,
+                                       
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+                    min_task_buckets);
+    bool _rebalance_partition(
+            int partition_id, const TaskBucket& to_task_bucket,
+            IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+                    max_task_buckets,
+            IndexedPriorityQueue<TaskBucket, 
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+                    min_task_buckets);
+
+    bool _should_rebalance(long data_processed);
+    void _rebalance_partitions(long data_processed);
+
+private:
+    static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7;
+
+    int _partition_count;
+    int _task_count;
+    int _task_bucket_count;
+    long _min_partition_data_processed_rebalance_threshold;
+    long _min_data_processed_rebalance_threshold;
+    std::vector<long> _partition_row_count;
+    long _data_processed;
+    long _data_processed_at_last_rebalance;
+    std::vector<long> _partition_data_size;
+    std::vector<long> _partition_data_size_at_last_rebalance;
+    std::vector<long> _partition_data_size_since_last_rebalance_per_task;
+    std::vector<long> _estimated_task_bucket_data_size_since_last_rebalance;
+
+    std::vector<std::vector<TaskBucket>> _partition_assignments;
+};
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/runtime/vorc_transformer.cpp 
b/be/src/vec/runtime/vorc_transformer.cpp
index be52b672aa8..764a97ae5bc 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -99,16 +99,33 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state, 
doris::io::FileWriter* fil
         : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
           _file_writer(file_writer),
           _write_options(new orc::WriterOptions()),
-          _schema_str(schema) {
+          _schema_str(&schema),
+          _schema(nullptr) {
     _write_options->setTimezoneName(_state->timezone());
     _write_options->setUseTightNumericVector(true);
 }
 
+VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* 
file_writer,
+                                 const VExprContextSPtrs& output_vexpr_ctxs,
+                                 std::unique_ptr<orc::Type> schema, bool 
output_object_data,
+                                 orc::CompressionKind compression)
+        : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+          _file_writer(file_writer),
+          _write_options(new orc::WriterOptions()),
+          _schema_str(nullptr),
+          _schema(std::move(schema)) {
+    _write_options->setTimezoneName(_state->timezone());
+    _write_options->setUseTightNumericVector(true);
+    _write_options->setCompression(compression);
+}
+
 Status VOrcTransformer::open() {
     try {
-        _schema = orc::Type::buildTypeFromString(_schema_str);
+        if (_schema == nullptr && _schema_str != nullptr) {
+            _schema = orc::Type::buildTypeFromString(*_schema_str);
+        }
     } catch (const std::exception& e) {
-        return Status::InternalError("Orc build schema from \"{}\" failed: 
{}", _schema_str,
+        return Status::InternalError("Orc build schema from \"{}\" failed: 
{}", *_schema_str,
                                      e.what());
     }
     _output_stream = std::make_unique<VOrcOutputStream>(_file_writer);
diff --git a/be/src/vec/runtime/vorc_transformer.h 
b/be/src/vec/runtime/vorc_transformer.h
index 4b8ea178ca9..8cfc956c0cd 100644
--- a/be/src/vec/runtime/vorc_transformer.h
+++ b/be/src/vec/runtime/vorc_transformer.h
@@ -78,6 +78,10 @@ public:
                     const VExprContextSPtrs& output_vexpr_ctxs, const 
std::string& schema,
                     bool output_object_data);
 
+    VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
+                    const VExprContextSPtrs& output_vexpr_ctxs, 
std::unique_ptr<orc::Type> schema,
+                    bool output_object_data, orc::CompressionKind compression);
+
     ~VOrcTransformer() = default;
 
     Status open() override;
@@ -99,7 +103,7 @@ private:
     doris::io::FileWriter* _file_writer = nullptr;
     std::unique_ptr<orc::OutputStream> _output_stream;
     std::unique_ptr<orc::WriterOptions> _write_options;
-    const std::string& _schema_str;
+    const std::string* _schema_str;
     std::unique_ptr<orc::Type> _schema;
     std::unique_ptr<orc::Writer> _writer;
 
diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp 
b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
new file mode 100644
index 00000000000..f7435249c20
--- /dev/null
+++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <functional>
+#include <iostream>
+#include <vector>
+
+#include "vec/core/block.h"
+#include "vec/exec/skewed_partition_rebalancer.h"
+
+namespace doris::vectorized {
+
+template <typename PartitionFunction>
+class ScaleWriterPartitioningExchanger {
+public:
+    ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction& 
partition_function,
+                                     int partition_count, int task_count, int 
task_bucket_count,
+                                     long 
min_partition_data_processed_rebalance_threshold,
+                                     long 
min_data_processed_rebalance_threshold)
+            : _channel_size(channel_size),
+              _partition_function(partition_function),
+              _partition_rebalancer(partition_count, task_count, 
task_bucket_count,
+                                    
min_partition_data_processed_rebalance_threshold,
+                                    min_data_processed_rebalance_threshold),
+              _partition_row_counts(partition_count, 0),
+              _partition_writer_ids(partition_count, -1),
+              _partition_writer_indexes(partition_count, 0) {}
+
+    std::vector<std::vector<uint32_t>> accept(Block* block) {
+        std::vector<std::vector<uint32_t>> writerAssignments(_channel_size,
+                                                             
std::vector<uint32_t>());
+        for (int partition_id = 0; partition_id < 
_partition_row_counts.size(); partition_id++) {
+            _partition_row_counts[partition_id] = 0;
+            _partition_writer_ids[partition_id] = -1;
+        }
+
+        _partition_rebalancer.rebalance();
+
+        for (int position = 0; position < block->rows(); position++) {
+            int partition_id = _partition_function.get_partition(block, 
position);
+            _partition_row_counts[partition_id] += 1;
+
+            // Get writer id for this partition by looking at the scaling state
+            int writer_id = _partition_writer_ids[partition_id];
+            if (writer_id == -1) {
+                writer_id = get_next_writer_id(partition_id);
+                _partition_writer_ids[partition_id] = writer_id;
+            }
+            writerAssignments[writer_id].push_back(position);
+        }
+
+        for (int partition_id = 0; partition_id < 
_partition_row_counts.size(); partition_id++) {
+            _partition_rebalancer.add_partition_row_count(partition_id,
+                                                          
_partition_row_counts[partition_id]);
+        }
+        _partition_rebalancer.add_data_processed(block->bytes());
+
+        return writerAssignments;
+    }
+
+    int get_next_writer_id(int partition_id) {
+        return _partition_rebalancer.get_task_id(partition_id,
+                                                 
_partition_writer_indexes[partition_id]++);
+    }
+
+private:
+    int _channel_size;
+    PartitionFunction& _partition_function;
+    SkewedPartitionRebalancer _partition_rebalancer;
+    std::vector<int> _partition_row_counts;
+    std::vector<int> _partition_writer_ids;
+    std::vector<int> _partition_writer_indexes;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/vhive_table_sink.cpp 
b/be/src/vec/sink/vhive_table_sink.cpp
new file mode 100644
index 00000000000..0fba50cef69
--- /dev/null
+++ b/be/src/vec/sink/vhive_table_sink.cpp
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vhive_table_sink.h"
+
+namespace doris {
+class TExpr;
+
+namespace vectorized {
+
+VHiveTableSink::VHiveTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+                               const std::vector<TExpr>& texprs)
+        : AsyncWriterSink<VHiveTableWriter, VHIVE_TABLE_SINK>(row_desc, 
texprs), _pool(pool) {}
+
+VHiveTableSink::~VHiveTableSink() = default;
+
+Status VHiveTableSink::init(const TDataSink& t_sink) {
+    RETURN_IF_ERROR(AsyncWriterSink::init(t_sink));
+    RETURN_IF_ERROR(_writer->init_properties(_pool));
+    return Status::OK();
+}
+
+Status VHiveTableSink::close(RuntimeState* state, Status exec_status) {
+    SCOPED_TIMER(_exec_timer);
+    if (_closed) {
+        return _close_status;
+    }
+    RETURN_IF_ERROR(DataSink::close(state, exec_status));
+    _close_status = AsyncWriterSink::close(state, exec_status);
+    return _close_status;
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/vhive_table_sink.h 
b/be/src/vec/sink/vhive_table_sink.h
new file mode 100644
index 00000000000..d7b9c3fc856
--- /dev/null
+++ b/be/src/vec/sink/vhive_table_sink.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "vec/sink/async_writer_sink.h"
+#include "vec/sink/writer/vhive_table_writer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RowDescriptor;
+
+namespace vectorized {
+
+inline constexpr char VHIVE_TABLE_SINK[] = "VHiveTableSink";
+
+class VHiveTableSink final : public AsyncWriterSink<VHiveTableWriter, 
VHIVE_TABLE_SINK> {
+public:
+    // Construct from thrift struct which is generated by FE.
+    VHiveTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+                   const std::vector<TExpr>& texprs);
+
+    ~VHiveTableSink() override;
+
+    Status init(const TDataSink& sink) override;
+
+    Status close(RuntimeState* state, Status exec_status) override;
+
+private:
+    ObjectPool* _pool = nullptr;
+
+    Status _close_status = Status::OK();
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
new file mode 100644
index 00000000000..38668abb3fc
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_partition_writer.h"
+
+#include "io/file_factory.h"
+#include "io/fs/file_system.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/materialize_block.h"
+#include "vec/runtime/vorc_transformer.h"
+#include "vec/runtime/vparquet_transformer.h"
+
+namespace doris {
+namespace vectorized {
+
+VHivePartitionWriter::VHivePartitionWriter(
+        const TDataSink& t_sink, const std::string partition_name, 
TUpdateMode::type update_mode,
+        const VExprContextSPtrs& output_expr_ctxs, const 
std::vector<THiveColumn>& columns,
+        WriteInfo write_info, const std::string file_name, 
TFileFormatType::type file_format_type,
+        TFileCompressType::type hive_compress_type,
+        const std::map<std::string, std::string>& hadoop_conf)
+        : _partition_name(std::move(partition_name)),
+          _update_mode(update_mode),
+          _vec_output_expr_ctxs(output_expr_ctxs),
+          _columns(columns),
+          _write_info(std::move(write_info)),
+          _file_name(std::move(file_name)),
+          _file_format_type(file_format_type),
+          _hive_compress_type(hive_compress_type),
+          _hadoop_conf(hadoop_conf)
+
+{}
+
+Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* 
profile) {
+    _state = state;
+
+    std::vector<TNetworkAddress> broker_addresses;
+    RETURN_IF_ERROR(FileFactory::create_file_writer(
+            _write_info.file_type, state->exec_env(), broker_addresses, 
_hadoop_conf,
+            fmt::format("{}/{}", _write_info.write_path, _file_name), 0, 
_file_writer_impl));
+
+    switch (_file_format_type) {
+    case TFileFormatType::FORMAT_PARQUET: {
+        bool parquet_disable_dictionary = false;
+        TParquetCompressionType::type parquet_compression_type;
+        switch (_hive_compress_type) {
+        case TFileCompressType::PLAIN: {
+            parquet_compression_type = TParquetCompressionType::UNCOMPRESSED;
+            break;
+        }
+        case TFileCompressType::SNAPPYBLOCK: {
+            parquet_compression_type = TParquetCompressionType::SNAPPY;
+            break;
+        }
+        case TFileCompressType::ZSTD: {
+            parquet_compression_type = TParquetCompressionType::ZSTD;
+            break;
+        }
+        default: {
+            return Status::InternalError("Unsupported hive compress type {} 
with parquet",
+                                         to_string(_hive_compress_type));
+        }
+        }
+        std::vector<TParquetSchema> parquet_schemas;
+        for (int i = 0; i < _columns.size(); i++) {
+            VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
+            TParquetSchema parquet_schema;
+            parquet_schema.schema_column_name = _columns[i].name;
+            parquet_schemas.emplace_back(std::move(parquet_schema));
+        }
+        _vfile_writer.reset(new VParquetTransformer(
+                state, _file_writer_impl.get(), _vec_output_expr_ctxs, 
parquet_schemas,
+                parquet_compression_type, parquet_disable_dictionary, 
TParquetVersion::PARQUET_1_0,
+                false));
+        return _vfile_writer->open();
+    }
+    case TFileFormatType::FORMAT_ORC: {
+        orc::CompressionKind orc_compression_type;
+        switch (_hive_compress_type) {
+        case TFileCompressType::PLAIN: {
+            orc_compression_type = orc::CompressionKind::CompressionKind_NONE;
+            break;
+        }
+        case TFileCompressType::SNAPPYBLOCK: {
+            orc_compression_type = 
orc::CompressionKind::CompressionKind_SNAPPY;
+            break;
+        }
+        case TFileCompressType::ZLIB: {
+            orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
+            break;
+        }
+        case TFileCompressType::ZSTD: {
+            orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD;
+            break;
+        }
+        default: {
+            return Status::InternalError("Unsupported type {} with orc", 
_hive_compress_type);
+        }
+        }
+        orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
+
+        std::unique_ptr<orc::Type> root_schema = orc::createStructType();
+        for (int i = 0; i < _columns.size(); i++) {
+            VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
+            try {
+                root_schema->addStructField(_columns[i].name, 
_build_orc_type(column_expr->type()));
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+        }
+
+        _vfile_writer.reset(new VOrcTransformer(state, _file_writer_impl.get(),
+                                                _vec_output_expr_ctxs, 
std::move(root_schema),
+                                                false, orc_compression_type));
+        return _vfile_writer->open();
+    }
+    default: {
+        return Status::InternalError("Unsupported file format type {}",
+                                     to_string(_file_format_type));
+    }
+    }
+}
+
+Status VHivePartitionWriter::close(Status status) {
+    if (_vfile_writer != nullptr) {
+        Status st = _vfile_writer->close();
+        if (st != Status::OK()) {
+            LOG(WARNING) << fmt::format("_vfile_writer close failed, reason: 
{}", st.to_string());
+        }
+    }
+    if (status != Status::OK()) {
+        auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
+        Status st = _file_writer_impl->fs()->delete_file(path);
+        if (st != Status::OK()) {
+            LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", 
path, st.to_string());
+        }
+    }
+    _state->hive_partition_updates().emplace_back(_build_partition_update());
+    return Status::OK();
+}
+
+Status VHivePartitionWriter::write(vectorized::Block& block, 
vectorized::IColumn::Filter* filter) {
+    Block output_block;
+    RETURN_IF_ERROR(_projection_and_filter_block(block, filter, 
&output_block));
+    RETURN_IF_ERROR(_vfile_writer->write(output_block));
+    _row_count += output_block.rows();
+    _input_size_in_bytes += output_block.bytes();
+    return Status::OK();
+}
+
+std::unique_ptr<orc::Type> VHivePartitionWriter::_build_orc_type(
+        const TypeDescriptor& type_descriptor) {
+    std::pair<Status, std::unique_ptr<orc::Type>> result;
+    switch (type_descriptor.type) {
+    case TYPE_BOOLEAN: {
+        return orc::createPrimitiveType(orc::BOOLEAN);
+    }
+    case TYPE_TINYINT: {
+        return orc::createPrimitiveType(orc::BYTE);
+    }
+    case TYPE_SMALLINT: {
+        return orc::createPrimitiveType(orc::SHORT);
+    }
+    case TYPE_INT: {
+        return orc::createPrimitiveType(orc::INT);
+    }
+    case TYPE_BIGINT: {
+        return orc::createPrimitiveType(orc::LONG);
+    }
+    case TYPE_FLOAT: {
+        return orc::createPrimitiveType(orc::FLOAT);
+    }
+    case TYPE_DOUBLE: {
+        return orc::createPrimitiveType(orc::DOUBLE);
+    }
+    case TYPE_CHAR: {
+        return orc::createCharType(orc::CHAR, type_descriptor.len);
+    }
+    case TYPE_VARCHAR: {
+        return orc::createCharType(orc::VARCHAR, type_descriptor.len);
+    }
+    case TYPE_STRING: {
+        return orc::createPrimitiveType(orc::STRING);
+    }
+    case TYPE_BINARY: {
+        return orc::createPrimitiveType(orc::STRING);
+    }
+    case TYPE_DATEV2: {
+        return orc::createPrimitiveType(orc::DATE);
+    }
+    case TYPE_DATETIMEV2: {
+        return orc::createPrimitiveType(orc::TIMESTAMP);
+    }
+    case TYPE_DECIMAL32: {
+        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
+    }
+    case TYPE_DECIMAL64: {
+        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
+    }
+    case TYPE_DECIMAL128I: {
+        return orc::createDecimalType(type_descriptor.precision, 
type_descriptor.scale);
+    }
+    case TYPE_STRUCT: {
+        std::unique_ptr<orc::Type> struct_type = orc::createStructType();
+        for (int j = 0; j < type_descriptor.children.size(); ++j) {
+            struct_type->addStructField(type_descriptor.field_names[j],
+                                        
_build_orc_type(type_descriptor.children[j]));
+        }
+        return struct_type;
+    }
+    case TYPE_ARRAY: {
+        return 
orc::createListType(_build_orc_type(type_descriptor.children[0]));
+    }
+    case TYPE_MAP: {
+        return orc::createMapType(_build_orc_type(type_descriptor.children[0]),
+                                  
_build_orc_type(type_descriptor.children[1]));
+    }
+    default: {
+        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                               "Unsupported type {} to build orc type",
+                               type_descriptor.debug_string());
+    }
+    }
+}
+
+Status 
VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& 
input_block,
+                                                          const 
vectorized::IColumn::Filter* filter,
+                                                          
doris::vectorized::Block* output_block) {
+    Status status = Status::OK();
+    if (input_block.rows() == 0) {
+        return status;
+    }
+    
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
+            _vec_output_expr_ctxs, input_block, output_block));
+    materialize_block_inplace(*output_block);
+
+    if (filter == nullptr) {
+        return status;
+    }
+
+    std::vector<uint32_t> columns_to_filter;
+    int column_to_keep = input_block.columns();
+    columns_to_filter.resize(column_to_keep);
+    for (uint32_t i = 0; i < column_to_keep; ++i) {
+        columns_to_filter[i] = i;
+    }
+
+    Block::filter_block_internal(output_block, columns_to_filter, *filter);
+
+    return status;
+}
+
+THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
+    THivePartitionUpdate hive_partition_update;
+    hive_partition_update.__set_name(_partition_name);
+    hive_partition_update.__set_update_mode(_update_mode);
+    THiveLocationParams location;
+    location.__set_write_path(_write_info.write_path);
+    location.__set_target_path(_write_info.target_path);
+    hive_partition_update.__set_location(location);
+    hive_partition_update.__set_file_names({_file_name});
+    hive_partition_update.__set_row_count(_row_count);
+    hive_partition_update.__set_file_size(_input_size_in_bytes);
+    return hive_partition_update;
+}
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h 
b/be/src/vec/sink/writer/vhive_partition_writer.h
new file mode 100644
index 00000000000..afee81aa499
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_partition_writer.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>
+
+#include "io/fs/file_writer.h"
+#include "vec/columns/column.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/runtime/vfile_format_transformer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+
+class Block;
+class VFileFormatTransformer;
+
+class VHivePartitionWriter {
+public:
+    struct WriteInfo {
+        std::string write_path;
+        std::string target_path;
+        TFileType::type file_type;
+    };
+
+    VHivePartitionWriter(const TDataSink& t_sink, const std::string 
partition_name,
+                         TUpdateMode::type update_mode, const 
VExprContextSPtrs& output_expr_ctxs,
+                         const std::vector<THiveColumn>& columns, WriteInfo 
write_info,
+                         const std::string file_name, TFileFormatType::type 
file_format_type,
+                         TFileCompressType::type hive_compress_type,
+                         const std::map<std::string, std::string>& 
hadoop_conf);
+
+    Status init_properties(ObjectPool* pool) { return Status::OK(); }
+
+    Status open(RuntimeState* state, RuntimeProfile* profile);
+
+    Status write(vectorized::Block& block, IColumn::Filter* filter = nullptr);
+
+    Status close(Status);
+
+    inline size_t written_len() { return _vfile_writer->written_len(); }
+
+private:
+    std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor& 
type_descriptor);
+
+    Status _projection_and_filter_block(doris::vectorized::Block& input_block,
+                                        const vectorized::IColumn::Filter* 
filter,
+                                        doris::vectorized::Block* 
output_block);
+
+    THivePartitionUpdate _build_partition_update();
+
+    std::string _path;
+
+    std::string _partition_name;
+
+    TUpdateMode::type _update_mode;
+
+    size_t _row_count = 0;
+    size_t _input_size_in_bytes = 0;
+
+    const VExprContextSPtrs& _vec_output_expr_ctxs;
+
+    const std::vector<THiveColumn>& _columns;
+    WriteInfo _write_info;
+    std::string _file_name;
+    TFileFormatType::type _file_format_type;
+    TFileCompressType::type _hive_compress_type;
+    const std::map<std::string, std::string>& _hadoop_conf;
+
+    // If the result file format is plain text, like CSV, this _file_writer is 
owned by this FileResultWriter.
+    // If the result file format is Parquet, this _file_writer is owned by 
_parquet_writer.
+    std::unique_ptr<doris::io::FileWriter> _file_writer_impl = nullptr;
+    // convert block to parquet/orc/csv format
+    std::unique_ptr<VFileFormatTransformer> _vfile_writer = nullptr;
+
+    RuntimeState* _state;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp 
b/be/src/vec/sink/writer/vhive_table_writer.cpp
new file mode 100644
index 00000000000..1c123c92b3a
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {
+
+VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
+                                   const VExprContextSPtrs& output_expr_ctxs)
+        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.hive_table_sink);
+}
+
+Status VHiveTableWriter::init_properties(ObjectPool* pool) {
+    return Status::OK();
+}
+
+Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+    _profile = profile;
+
+    for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
+        if (_t_sink.hive_table_sink.columns[i].column_type == 
THiveColumnType::PARTITION_KEY) {
+            _partition_columns_input_index.emplace_back(i);
+        }
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::write(vectorized::Block& block) {
+    std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> 
writer_positions;
+
+    auto& hive_table_sink = _t_sink.hive_table_sink;
+
+    if (_partition_columns_input_index.empty()) {
+        auto writer_iter = _partitions_to_writers.find("");
+        if (writer_iter == _partitions_to_writers.end()) {
+            try {
+                std::shared_ptr<VHivePartitionWriter> writer = 
_create_partition_writer(block, -1);
+                _partitions_to_writers.insert({"", writer});
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                RETURN_IF_ERROR(writer->write(block));
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                _partitions_to_writers.erase(writer_iter);
+                try {
+                    writer = _create_partition_writer(block, -1);
+                    _partitions_to_writers.insert({"", writer});
+                    RETURN_IF_ERROR(writer->open(_state, _profile));
+                    RETURN_IF_ERROR(writer->write(block));
+                } catch (doris::Exception& e) {
+                    return e.to_status();
+                }
+            } else {
+                writer = writer_iter->second;
+            }
+            RETURN_IF_ERROR(writer->write(block));
+            return Status::OK();
+        }
+    }
+
+    for (int i = 0; i < block.rows(); ++i) {
+        std::vector<std::string> partition_values;
+        try {
+            partition_values = _create_partition_values(block, i);
+        } catch (doris::Exception& e) {
+            return e.to_status();
+        }
+        std::string partition_name = VHiveUtils::make_partition_name(
+                hive_table_sink.columns, _partition_columns_input_index, 
partition_values);
+
+        auto create_and_open_writer =
+                [&](const std::string& partition_name, int position,
+                    std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> 
Status {
+            try {
+                auto writer = _create_partition_writer(block, position);
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                IColumn::Filter filter(block.rows(), 0);
+                filter[position] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+                _partitions_to_writers.insert({partition_name, writer});
+                writer_ptr = writer;
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        };
+
+        auto writer_iter = _partitions_to_writers.find(partition_name);
+        if (writer_iter == _partitions_to_writers.end()) {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer));
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                writer_positions.erase(writer_iter->second);
+                _partitions_to_writers.erase(writer_iter);
+                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
writer));
+            } else {
+                writer = writer_iter->second;
+            }
+            auto writer_pos_iter = writer_positions.find(writer);
+            if (writer_pos_iter == writer_positions.end()) {
+                IColumn::Filter filter(block.rows(), 0);
+                filter[i] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+            } else {
+                writer_pos_iter->second[i] = 1;
+            }
+        }
+    }
+
+    for (auto it = writer_positions.begin(); it != writer_positions.end(); 
++it) {
+        RETURN_IF_ERROR(it->first->write(block, &it->second));
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::close(Status status) {
+    for (const auto& pair : _partitions_to_writers) {
+        Status st = pair.second->close(status);
+        if (st != Status::OK()) {
+            LOG(WARNING) << fmt::format("Unsupported type for partition {}", 
st.to_string());
+            continue;
+        }
+    }
+    _partitions_to_writers.clear();
+    return Status::OK();
+}
+
+std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer(
+        vectorized::Block& block, int position) {
+    auto& hive_table_sink = _t_sink.hive_table_sink;
+    std::vector<std::string> partition_values;
+    std::string partition_name;
+    if (!_partition_columns_input_index.empty()) {
+        partition_values = _create_partition_values(block, position);
+        partition_name = VHiveUtils::make_partition_name(
+                hive_table_sink.columns, _partition_columns_input_index, 
partition_values);
+    }
+    const std::vector<THivePartition>& partitions = hive_table_sink.partitions;
+    const THiveLocationParams& write_location = hive_table_sink.location;
+    const THivePartition* existing_partition = nullptr;
+    bool existing_table = true;
+    for (const auto& partition : partitions) {
+        if (partition_values == partition.values) {
+            existing_partition = &partition;
+            break;
+        }
+    }
+    TUpdateMode::type update_mode;
+    VHivePartitionWriter::WriteInfo write_info;
+    TFileFormatType::type file_format_type;
+    TFileCompressType::type write_compress_type;
+    if (existing_partition == nullptr) { // new partition
+        if (existing_table == false) {   // new table
+            update_mode = TUpdateMode::NEW;
+            if (_partition_columns_input_index.empty()) { // new unpartitioned 
table
+                write_info = {write_location.write_path, 
write_location.target_path,
+                              write_location.file_type};
+            } else { // a new partition in a new partitioned table
+                auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
+                auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
+                write_info = {std::move(write_path), std::move(target_path),
+                              write_location.file_type};
+            }
+        } else { // a new partition in an existing partitioned table, or an 
existing unpartitioned table
+            if (_partition_columns_input_index.empty()) { // an existing 
unpartitioned table
+                update_mode =
+                        !hive_table_sink.overwrite ? TUpdateMode::APPEND : 
TUpdateMode::OVERWRITE;
+                write_info = {write_location.write_path, 
write_location.target_path,
+                              write_location.file_type};
+            } else { // a new partition in an existing partitioned table
+                update_mode = TUpdateMode::NEW;
+                auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
+                auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
+                write_info = {std::move(write_path), std::move(target_path),
+                              write_location.file_type};
+            }
+            // need to get schema from existing table ?
+        }
+        file_format_type = hive_table_sink.file_format;
+        write_compress_type = hive_table_sink.compression_type;
+    } else { // existing partition
+        if (!hive_table_sink.overwrite) {
+            update_mode = TUpdateMode::APPEND;
+            auto write_path = fmt::format("{}/{}", write_location.write_path, 
partition_name);
+            auto target_path = fmt::format("{}", 
existing_partition->location.target_path);
+            write_info = {std::move(write_path), std::move(target_path),
+                          existing_partition->location.file_type};
+            file_format_type = existing_partition->file_format;
+            write_compress_type = hive_table_sink.compression_type;
+        } else {
+            update_mode = TUpdateMode::OVERWRITE;
+            auto write_path = fmt::format("{}/{}", write_location.write_path, 
partition_name);
+            auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
+            write_info = {std::move(write_path), std::move(target_path), 
write_location.file_type};
+            file_format_type = hive_table_sink.file_format;
+            write_compress_type = hive_table_sink.compression_type;
+            // need to get schema from existing table ?
+        }
+    }
+
+    return std::make_shared<VHivePartitionWriter>(
+            _t_sink, std::move(partition_name), update_mode, 
_vec_output_expr_ctxs,
+            hive_table_sink.columns, std::move(write_info),
+            fmt::format("{}{}", _compute_file_name(),
+                        _get_file_extension(file_format_type, 
write_compress_type)),
+            file_format_type, write_compress_type, 
hive_table_sink.hadoop_config);
+}
+
+std::vector<std::string> 
VHiveTableWriter::_create_partition_values(vectorized::Block& block,
+                                                                    int 
position) {
+    std::vector<std::string> partition_values;
+    for (int i = 0; i < _partition_columns_input_index.size(); ++i) {
+        int partition_column_idx = _partition_columns_input_index[i];
+        vectorized::ColumnWithTypeAndName partition_column =
+                block.get_by_position(partition_column_idx);
+        std::string value =
+                
_to_partition_value(_vec_output_expr_ctxs[partition_column_idx]->root()->type(),
+                                    partition_column, position);
+
+        // Check if value contains only printable ASCII characters
+        bool isValid = true;
+        for (char c : value) {
+            if (c < 0x20 || c > 0x7E) {
+                isValid = false;
+                break;
+            }
+        }
+
+        if (!isValid) {
+            // Encode value using Base16 encoding with space separator
+            std::stringstream encoded;
+            for (unsigned char c : value) {
+                encoded << std::hex << std::setw(2) << std::setfill('0') << 
(int)c;
+                encoded << " ";
+            }
+            throw doris::Exception(
+                    doris::ErrorCode::INTERNAL_ERROR,
+                    "Hive partition values can only contain printable ASCII 
characters (0x20 - "
+                    "0x7E). Invalid value: {}",
+                    encoded.str());
+        }
+
+        partition_values.emplace_back(value);
+    }
+
+    return partition_values;
+}
+
+std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& 
type_desc,
+                                                  const ColumnWithTypeAndName& 
partition_column,
+                                                  int position) {
+    ColumnPtr column;
+    if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*partition_column.column)) {
+        auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
+        if (null_map_data[position]) {
+            return "__HIVE_DEFAULT_PARTITION__";
+        }
+        column = nullable_column->get_nested_column_ptr();
+    } else {
+        column = partition_column.column;
+    }
+    auto [item, size] = column->get_data_at(position);
+    switch (type_desc.type) {
+    case TYPE_BOOLEAN: {
+        vectorized::Field field =
+                vectorized::check_and_get_column<const 
ColumnUInt8>(*column)->operator[](position);
+        return std::to_string(field.get<bool>());
+    }
+    case TYPE_TINYINT: {
+        return std::to_string(*reinterpret_cast<const Int8*>(item));
+    }
+    case TYPE_SMALLINT: {
+        return std::to_string(*reinterpret_cast<const Int16*>(item));
+    }
+    case TYPE_INT: {
+        return std::to_string(*reinterpret_cast<const Int32*>(item));
+    }
+    case TYPE_BIGINT: {
+        return std::to_string(*reinterpret_cast<const Int64*>(item));
+    }
+    case TYPE_FLOAT: {
+        return std::to_string(*reinterpret_cast<const Float32*>(item));
+    }
+    case TYPE_DOUBLE: {
+        return std::to_string(*reinterpret_cast<const Float64*>(item));
+    }
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_STRING: {
+        return std::string(item, size);
+    }
+    case TYPE_DATE: {
+        VecDateTimeValue value = binary_cast<int64_t, 
doris::VecDateTimeValue>(*(int64_t*)item);
+
+        char buf[64];
+        char* pos = value.to_string(buf);
+        return std::string(buf, pos - buf - 1);
+    }
+    case TYPE_DATETIME: {
+        VecDateTimeValue value = binary_cast<int64_t, 
doris::VecDateTimeValue>(*(int64_t*)item);
+
+        char buf[64];
+        char* pos = value.to_string(buf);
+        return std::string(buf, pos - buf - 1);
+        break;
+    }
+    case TYPE_DATEV2: {
+        DateV2Value<DateV2ValueType> value =
+                binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>(*(int32_t*)item);
+
+        char buf[64];
+        char* pos = value.to_string(buf);
+        return std::string(buf, pos - buf - 1);
+    }
+    case TYPE_DATETIMEV2: {
+        DateV2Value<DateTimeV2ValueType> value =
+                binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(int64_t*)item);
+
+        char buf[64];
+        char* pos = value.to_string(buf, type_desc.scale);
+        return std::string(buf, pos - buf - 1);
+    }
+    case TYPE_DECIMALV2: {
+        Decimal128V2 value = *(Decimal128V2*)(item);
+        return value.to_string(type_desc.scale);
+    }
+    case TYPE_DECIMAL32: {
+        Decimal32 value = *(Decimal32*)(item);
+        return value.to_string(type_desc.scale);
+    }
+    case TYPE_DECIMAL64: {
+        Decimal64 value = *(Decimal64*)(item);
+        return value.to_string(type_desc.scale);
+    }
+    case TYPE_DECIMAL128I: {
+        Decimal128V3 value = *(Decimal128V3*)(item);
+        return value.to_string(type_desc.scale);
+    }
+    case TYPE_DECIMAL256: {
+        Decimal256 value = *(Decimal256*)(item);
+        return value.to_string(type_desc.scale);
+    }
+    default: {
+        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                               "Unsupported type for partition {}", 
type_desc.debug_string());
+    }
+    }
+}
+
+std::string VHiveTableWriter::_get_file_extension(TFileFormatType::type 
file_format_type,
+                                                  TFileCompressType::type 
write_compress_type) {
+    std::string compress_name;
+    switch (write_compress_type) {
+    case TFileCompressType::SNAPPYBLOCK: {
+        compress_name = ".snappy";
+        break;
+    }
+    case TFileCompressType::ZLIB: {
+        compress_name = ".zlib";
+        break;
+    }
+    case TFileCompressType::ZSTD: {
+        compress_name = ".zstd";
+        break;
+    }
+    default: {
+        compress_name = "";
+        break;
+    }
+    }
+
+    std::string file_format_name;
+    switch (file_format_type) {
+    case TFileFormatType::FORMAT_PARQUET: {
+        file_format_name = ".parquet";
+        break;
+    }
+    case TFileFormatType::FORMAT_ORC: {
+        file_format_name = ".orc";
+        break;
+    }
+    default: {
+        file_format_name = "";
+        break;
+    }
+    }
+    return fmt::format("{}{}", compress_name, file_format_name);
+}
+
+std::string VHiveTableWriter::_compute_file_name() {
+    boost::uuids::uuid uuid = boost::uuids::random_generator()();
+
+    std::string uuid_str = boost::uuids::to_string(uuid);
+
+    return fmt::format("{}_{}", print_id(_state->query_id()), uuid_str);
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h 
b/be/src/vec/sink/writer/vhive_table_writer.h
new file mode 100644
index 00000000000..a4681b32e3f
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>
+
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/writer/async_result_writer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+struct TypeDescriptor;
+
+namespace vectorized {
+
+class Block;
+class VHivePartitionWriter;
+struct ColumnWithTypeAndName;
+
+class VHiveTableWriter final : public AsyncResultWriter {
+public:
+    VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+
+    ~VHiveTableWriter() = default;
+
+    Status init_properties(ObjectPool* pool);
+
+    Status open(RuntimeState* state, RuntimeProfile* profile) override;
+
+    Status write(vectorized::Block& block) override;
+
+    Status close(Status) override;
+
+private:
+    std::shared_ptr<VHivePartitionWriter> 
_create_partition_writer(vectorized::Block& block,
+                                                                   int 
position);
+
+    std::vector<std::string> _create_partition_values(vectorized::Block& 
block, int position);
+
+    std::string _to_partition_value(const TypeDescriptor& type_desc,
+                                    const ColumnWithTypeAndName& 
partition_column, int position);
+
+    std::string _get_file_extension(TFileFormatType::type file_format_type,
+                                    TFileCompressType::type 
write_compress_type);
+
+    std::string _compute_file_name();
+
+    // Currently it is a copy, maybe it is better to use move semantics to 
eliminate it.
+    TDataSink _t_sink;
+    RuntimeState* _state = nullptr;
+    RuntimeProfile* _profile = nullptr;
+    std::vector<int> _partition_columns_input_index;
+    std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>> 
_partitions_to_writers;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_utils.cpp 
b/be/src/vec/sink/writer/vhive_utils.cpp
new file mode 100644
index 00000000000..9a97b893775
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_utils.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_utils.h"
+
+#include <algorithm>
+#include <regex>
+#include <sstream>
+
+namespace doris {
+namespace vectorized {
+
+const std::regex 
VHiveUtils::PATH_CHAR_TO_ESCAPE("[\\x00-\\x1F\"#%'*/:=?\\\\\\x7F\\{\\[\\]\\^]");
+
+std::string VHiveUtils::make_partition_name(const std::vector<THiveColumn>& 
columns,
+                                            const std::vector<int>& 
partition_columns_input_index,
+                                            const std::vector<std::string>& 
values) {
+    std::stringstream partition_name_stream;
+
+    for (size_t i = 0; i < partition_columns_input_index.size(); i++) {
+        if (i > 0) {
+            partition_name_stream << '/';
+        }
+        std::string column = columns[partition_columns_input_index[i]].name;
+        std::string value = values[i];
+        std::transform(column.begin(), column.end(), column.begin(),
+                       [&](char c) { return std::tolower(c); });
+        partition_name_stream << escape_path_name(column) << '=' << 
escape_path_name(value);
+    }
+
+    return partition_name_stream.str();
+}
+
+std::string VHiveUtils::escape_path_name(const std::string& path) {
+    if (path.empty()) {
+        return "__HIVE_DEFAULT_PARTITION__";
+    }
+
+    std::smatch match;
+    if (!std::regex_search(path, match, PATH_CHAR_TO_ESCAPE)) {
+        return path;
+    }
+
+    std::stringstream ss;
+    size_t from_index = 0;
+    auto begin = path.begin();
+    auto end = path.end();
+    while (std::regex_search(begin + from_index, end, match, 
PATH_CHAR_TO_ESCAPE)) {
+        size_t escape_at_index = match.position() + from_index;
+        if (escape_at_index > from_index) {
+            ss << path.substr(from_index, escape_at_index - from_index);
+        }
+        char c = path[escape_at_index];
+        ss << '%' << std::hex << std::uppercase << static_cast<int>(c >> 4)
+           << static_cast<int>(c & 0xF);
+        from_index = escape_at_index + 1;
+    }
+    if (from_index < path.length()) {
+        ss << path.substr(from_index);
+    }
+    return ss.str();
+}
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vhive_utils.h 
b/be/src/vec/sink/writer/vhive_utils.h
new file mode 100644
index 00000000000..49cd45dc3a8
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_utils.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>
+
+#include <algorithm>
+#include <iostream>
+#include <regex>
+#include <sstream>
+#include <string>
+#include <vector>
+
+namespace doris {
+namespace vectorized {
+
+class VHiveUtils {
+public:
+    VHiveUtils() = delete;
+
+    static const std::regex PATH_CHAR_TO_ESCAPE;
+
+    static std::string make_partition_name(const std::vector<THiveColumn>& 
columns,
+                                           const std::vector<int>& 
partition_columns_input_index,
+                                           const std::vector<std::string>& 
values);
+
+    static std::string escape_path_name(const std::string& path);
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/test/util/indexed_priority_queue_test.cpp 
b/be/test/util/indexed_priority_queue_test.cpp
new file mode 100644
index 00000000000..54686fc0194
--- /dev/null
+++ b/be/test/util/indexed_priority_queue_test.cpp
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/indexed_priority_queue.hpp"
+
+#include <gtest/gtest.h>
+
+namespace doris {
+
+class IndexedPriorityQueueTest : public testing::Test {
+public:
+    IndexedPriorityQueueTest() = default;
+    virtual ~IndexedPriorityQueueTest() = default;
+};
+
+TEST_F(IndexedPriorityQueueTest, test_high_to_low) {
+    IndexedPriorityQueue<int, 
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> pq;
+
+    pq.add_or_update(3, 10);
+    pq.add_or_update(1, 5);
+    pq.add_or_update(4, 15);
+    pq.add_or_update(2, 8);
+    pq.add_or_update(5, 5);
+    pq.add_or_update(6, 5);
+
+    std::vector<int> expected_elements = {4, 3, 2, 1, 5, 6};
+    std::vector<int> actual_elements;
+    for (auto& elem : pq) {
+        actual_elements.push_back(elem);
+    }
+    EXPECT_EQ(expected_elements, actual_elements);
+
+    int removed = 2;
+    pq.remove(removed);
+
+    expected_elements = {4, 3, 1, 5, 6};
+    actual_elements.clear();
+    for (auto& elem : pq) {
+        actual_elements.push_back(elem);
+    }
+    EXPECT_EQ(expected_elements, actual_elements);
+
+    pq.add_or_update(4, 1);
+
+    expected_elements = {3, 1, 5, 6, 4};
+    actual_elements.clear();
+    for (auto& elem : pq) {
+        actual_elements.push_back(elem);
+    }
+    EXPECT_EQ(expected_elements, actual_elements);
+}
+
+TEST_F(IndexedPriorityQueueTest, test_low_to_high) {
+    IndexedPriorityQueue<int, 
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH> pq;
+
+    pq.add_or_update(3, 10);
+    pq.add_or_update(1, 5);
+    pq.add_or_update(4, 15);
+    pq.add_or_update(2, 8);
+    pq.add_or_update(5, 5);
+    pq.add_or_update(6, 5);
+
+    std::vector<int> expected_elements = {1, 5, 6, 2, 3, 4};
+    std::vector<int> actual_elements;
+    for (auto& elem : pq) {
+        actual_elements.push_back(elem);
+    }
+    EXPECT_EQ(expected_elements, actual_elements);
+
+    int removed = 2;
+    pq.remove(removed);
+
+    expected_elements = {1, 5, 6, 3, 4};
+    actual_elements.clear();
+    for (auto& elem : pq) {
+        actual_elements.push_back(elem);
+    }
+    EXPECT_EQ(expected_elements, actual_elements);
+
+    pq.add_or_update(4, 1);
+
+    expected_elements = {4, 1, 5, 6, 3};
+    actual_elements.clear();
+    for (auto& elem : pq) {
+        actual_elements.push_back(elem);
+    }
+    EXPECT_EQ(expected_elements, actual_elements);
+}
+
+} // namespace doris
diff --git a/be/test/vec/exec/skewed_partition_rebalancer_test.cpp 
b/be/test/vec/exec/skewed_partition_rebalancer_test.cpp
new file mode 100644
index 00000000000..f5ce4d2bb8e
--- /dev/null
+++ b/be/test/vec/exec/skewed_partition_rebalancer_test.cpp
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
+// to cpp and modified by Doris
+
+#include "vec/exec/skewed_partition_rebalancer.h"
+
+#include <gtest/gtest.h>
+
+#include <list>
+
+namespace doris::vectorized {
+
+class SkewedPartitionRebalancerTest : public testing::Test {
+public:
+    SkewedPartitionRebalancerTest() = default;
+    virtual ~SkewedPartitionRebalancerTest() = default;
+
+private:
+    std::vector<std::list<int>> _get_partition_positions(
+            std::unique_ptr<SkewedPartitionRebalancer>& rebalancer,
+            std::vector<long>& partition_row_count, int partition_count, int 
max_position) {
+        std::vector<std::list<int>> 
partitionPositions(rebalancer->get_task_count());
+
+        for (int partition = 0; partition < rebalancer->get_task_count(); 
partition++) {
+            partitionPositions[partition] = std::list<int>();
+        }
+
+        for (int position = 0; position < max_position; position++) {
+            int partition = position % partition_count;
+            partition = rebalancer->get_task_id(partition, 
partition_row_count[partition]++);
+            partitionPositions[partition].push_back(position);
+        }
+
+        return partitionPositions;
+    }
+
+    static bool _vectors_equal(const std::vector<std::list<int>>& vec1,
+                               const std::vector<std::list<int>>& vec2) {
+        if (vec1.size() != vec2.size()) {
+            return false;
+        }
+        for (size_t i = 0; i < vec1.size(); i++) {
+            if (vec1[i] != vec2[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    static bool _compare_vector_of_lists(const std::vector<std::list<int>>& 
expected,
+                                         const std::vector<std::list<int>>& 
actual) {
+        if (expected.size() != actual.size()) {
+            return false;
+        }
+
+        for (size_t i = 0; i < expected.size(); ++i) {
+            if (expected[i] != actual[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+};
+
+TEST_F(SkewedPartitionRebalancerTest, test_rebalance_with_skewness) {
+    const int partitionCount = 3;
+    const int taskCount = 3;
+    const int taskBucketCount = 3;
+    const long MEGABYTE = 1024 * 1024;
+    const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * 
MEGABYTE; // 1MB
+    const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;         
 // 50MB
+
+    std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+            new SkewedPartitionRebalancer(partitionCount, taskCount, 
taskBucketCount,
+                                          
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+                                          
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+    rebalancer->add_partition_row_count(0, 1000);
+    rebalancer->add_partition_row_count(1, 1000);
+    rebalancer->add_partition_row_count(2, 1000);
+    rebalancer->add_data_processed(40 * MEGABYTE);
+    rebalancer->rebalance();
+
+    std::vector<long> partitionRowCount(partitionCount, 0);
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 3, 6, 9, 12, 15}, {1, 4, 7, 10, 13, 16}, {2, 5, 8, 11, 14}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 17)));
+    EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}}, 
rebalancer->get_partition_assignments()));
+
+    rebalancer->add_partition_row_count(0, 1000);
+    rebalancer->add_partition_row_count(1, 1000);
+    rebalancer->add_partition_row_count(2, 1000);
+    rebalancer->add_data_processed(20 * MEGABYTE);
+
+    // Rebalancing will happen since we crossed the data processed limit.
+    // Part0 -> Task1 (Bucket1), Part1 -> Task0 (Bucket1), Part2 -> Task0 
(Bucket2)
+    rebalancer->rebalance();
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 2, 4, 6, 8, 10, 12, 14, 16}, {1, 3, 7, 9, 13, 15}, {5, 11}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 17)));
+    EXPECT_TRUE(_compare_vector_of_lists({{0, 1}, {1, 0}, {2, 0}},
+                                         
rebalancer->get_partition_assignments()));
+
+    rebalancer->add_partition_row_count(0, 1000);
+    rebalancer->add_partition_row_count(1, 1000);
+    rebalancer->add_partition_row_count(2, 1000);
+    rebalancer->add_data_processed(200 * MEGABYTE);
+
+    // Rebalancing will happen
+    // Part0 -> Task2 (Bucket1), Part1 -> Task2 (Bucket2), Part2 -> Task1 
(Bucket2)
+    rebalancer->rebalance();
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 2, 4, 9, 11, 13}, {1, 3, 5, 10, 12, 14}, {6, 7, 8, 15, 16}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 17)));
+    EXPECT_TRUE(_compare_vector_of_lists({{0, 1, 2}, {1, 0, 2}, {2, 0, 1}},
+                                         
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest, test_rebalance_without_skewness) {
+    const int partitionCount = 6;
+    const int taskCount = 3;
+    const int taskBucketCount = 2;
+    const long MEGABYTE = 1024 * 1024;
+    const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * 
MEGABYTE; // 1MB
+    const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;         
 // 50MB
+
+    std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+            new SkewedPartitionRebalancer(partitionCount, taskCount, 
taskBucketCount,
+                                          
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+                                          
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+    rebalancer->add_partition_row_count(0, 1000);
+    rebalancer->add_partition_row_count(1, 700);
+    rebalancer->add_partition_row_count(2, 600);
+    rebalancer->add_partition_row_count(3, 1000);
+    rebalancer->add_partition_row_count(4, 700);
+    rebalancer->add_partition_row_count(5, 600);
+
+    rebalancer->add_data_processed(500 * MEGABYTE);
+    // No rebalancing will happen since there is no skewness across task 
buckets
+    rebalancer->rebalance();
+
+    std::vector<long> partitionRowCount(partitionCount, 0);
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 3}, {1, 4}, {2, 5}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 6)));
+    EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}, {0}, {1}, {2}},
+                                         
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest,
+       test_no_rebalance_when_data_written_is_less_than_the_rebalance_limit) {
+    const int partitionCount = 3;
+    const int taskCount = 3;
+    const int taskBucketCount = 3;
+    const long MEGABYTE = 1024 * 1024;
+    const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * 
MEGABYTE; // 1MB
+    const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;         
 // 50MB
+
+    std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+            new SkewedPartitionRebalancer(partitionCount, taskCount, 
taskBucketCount,
+                                          
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+                                          
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+
+    rebalancer->add_partition_row_count(0, 1000);
+    rebalancer->add_partition_row_count(1, 0);
+    rebalancer->add_partition_row_count(2, 0);
+
+    rebalancer->add_data_processed(40 * MEGABYTE);
+    // No rebalancing will happen since we do not cross the max data processed 
limit of 50MB
+    rebalancer->rebalance();
+
+    std::vector<long> partitionRowCount(partitionCount, 0);
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 3}, {1, 4}, {2, 5}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 6)));
+    EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}}, 
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest,
+       
test_no_rebalance_when_data_written_by_the_partition_is_less_than_writer_sacling_min_data_processed)
 {
+    const int partitionCount = 3;
+    const int taskCount = 3;
+    const int taskBucketCount = 3;
+    const long MEGABYTE = 1024 * 1024;
+    const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * 
MEGABYTE; // 50MB
+    const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;         
  // 50MB
+
+    std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+            new SkewedPartitionRebalancer(partitionCount, taskCount, 
taskBucketCount,
+                                          
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+                                          
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+
+    rebalancer->add_partition_row_count(0, 1000);
+    rebalancer->add_partition_row_count(1, 600);
+    rebalancer->add_partition_row_count(2, 0);
+
+    rebalancer->add_data_processed(60 * MEGABYTE);
+    // No rebalancing will happen since no partition has crossed the 
writerScalingMinDataProcessed limit of 50MB
+    rebalancer->rebalance();
+
+    std::vector<long> partitionRowCount(partitionCount, 0);
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 3}, {1, 4}, {2, 5}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 6)));
+    EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}}, 
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest,
+       test_rebalance_partition_to_single_task_in_a_rebalancing_loop) {
+    const int partitionCount = 3;
+    const int taskCount = 3;
+    const int taskBucketCount = 3;
+    const long MEGABYTE = 1024 * 1024;
+    const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * 
MEGABYTE; // 1MB
+    const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;         
 // 50MB
+
+    std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+            new SkewedPartitionRebalancer(partitionCount, taskCount, 
taskBucketCount,
+                                          
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+                                          
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+
+    rebalancer->add_partition_row_count(0, 1000);
+    rebalancer->add_partition_row_count(1, 0);
+    rebalancer->add_partition_row_count(2, 0);
+
+    rebalancer->add_data_processed(60 * MEGABYTE);
+    // rebalancing will only happen to a single task even though two tasks are 
available
+    rebalancer->rebalance();
+
+    std::vector<long> partitionRowCount(partitionCount, 0);
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 6, 12}, {1, 3, 4, 7, 9, 10, 13, 15, 16}, {2, 5, 8, 11, 14}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 17)));
+    EXPECT_TRUE(
+            _compare_vector_of_lists({{0, 1}, {1}, {2}}, 
rebalancer->get_partition_assignments()));
+
+    rebalancer->add_partition_row_count(0, 1000);
+    rebalancer->add_partition_row_count(1, 0);
+    rebalancer->add_partition_row_count(2, 0);
+    rebalancer->add_data_processed(60 * MEGABYTE);
+    rebalancer->rebalance();
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 9}, {1, 3, 4, 7, 10, 12, 13, 16}, {2, 5, 6, 8, 11, 14, 15}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 17)));
+    EXPECT_TRUE(_compare_vector_of_lists({{0, 1, 2}, {1}, {2}},
+                                         
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest, 
test_consider_skewed_partition_only_within_a_cycle) {
+    const int partitionCount = 3;
+    const int taskCount = 3;
+    const int taskBucketCount = 1;
+    const long MEGABYTE = 1024 * 1024;
+    const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 * 
MEGABYTE; // 1MB
+    const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;         
 // 50MB
+
+    std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+            new SkewedPartitionRebalancer(partitionCount, taskCount, 
taskBucketCount,
+                                          
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+                                          
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+
+    rebalancer->add_partition_row_count(0, 1000);
+    rebalancer->add_partition_row_count(1, 800);
+    rebalancer->add_partition_row_count(2, 0);
+
+    rebalancer->add_data_processed(60 * MEGABYTE);
+    // rebalancing will happen for partition 0 to task 2 since partition 0 is 
skewed.
+    rebalancer->rebalance();
+
+    std::vector<long> partitionRowCount(partitionCount, 0);
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 6, 12}, {1, 4, 7, 10, 13, 16}, {2, 3, 5, 8, 9, 11, 14, 15}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 17)));
+    EXPECT_TRUE(
+            _compare_vector_of_lists({{0, 2}, {1}, {2}}, 
rebalancer->get_partition_assignments()));
+
+    rebalancer->add_partition_row_count(0, 0);
+    rebalancer->add_partition_row_count(1, 800);
+    rebalancer->add_partition_row_count(2, 1000);
+    // rebalancing will happen for partition 2 to task 0 since partition 2 is 
skewed. Even though partition 1 has
+    // written more amount of data from start, it will not be considered since 
it is not the most skewed in
+    // this rebalancing cycle.
+    rebalancer->add_data_processed(60 * MEGABYTE);
+    rebalancer->rebalance();
+
+    ASSERT_TRUE(_vectors_equal(
+            {{0, 2, 6, 8, 12, 14}, {1, 4, 7, 10, 13, 16}, {3, 5, 9, 11, 15}},
+            _get_partition_positions(rebalancer, partitionRowCount, 
partitionCount, 17)));
+    EXPECT_TRUE(_compare_vector_of_lists({{0, 2}, {1}, {2, 0}},
+                                         
rebalancer->get_partition_assignments()));
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/exec/vhive_utils_test.cpp 
b/be/test/vec/exec/vhive_utils_test.cpp
new file mode 100644
index 00000000000..d14a004e50b
--- /dev/null
+++ b/be/test/vec/exec/vhive_utils_test.cpp
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/vhive_utils.h"
+
+#include <gtest/gtest.h>
+
+namespace doris::vectorized {
+
+class VHiveUtilsTest : public testing::Test {
+public:
+    VHiveUtilsTest() = default;
+    virtual ~VHiveUtilsTest() = default;
+};
+
+TEST_F(VHiveUtilsTest, test_make_partition_name) {
+    {
+        std::vector<THiveColumn> columns;
+        THiveColumn column1;
+        column1.name = "abc";
+        columns.emplace_back(std::move(column1));
+        std::vector<int> partition_columns_input_index = {0};
+        EXPECT_EQ("abc=xyz",
+                  VHiveUtils::make_partition_name(columns, 
partition_columns_input_index, {"xyz"}));
+    }
+
+    {
+        std::vector<THiveColumn> columns;
+        THiveColumn column1;
+        column1.name = "abc:qqq";
+        columns.emplace_back(std::move(column1));
+        std::vector<int> partition_columns_input_index = {0};
+        EXPECT_EQ("abc%3Aqqq=xyz%2Fyyy%3Dzzz",
+                  VHiveUtils::make_partition_name(columns, 
partition_columns_input_index,
+                                                  {"xyz/yyy=zzz"}));
+    }
+
+    {
+        std::vector<THiveColumn> columns;
+        THiveColumn column1;
+        column1.name = "abc";
+        columns.emplace_back(std::move(column1));
+        THiveColumn column2;
+        column2.name = "def";
+        columns.emplace_back(std::move(column2));
+        THiveColumn column3;
+        column3.name = "xyz";
+        columns.emplace_back(std::move(column3));
+        std::vector<int> partition_columns_input_index = {0, 1, 2};
+        EXPECT_EQ("abc=qqq/def=rrr/xyz=sss",
+                  VHiveUtils::make_partition_name(columns, 
partition_columns_input_index,
+                                                  {"qqq", "rrr", "sss"}));
+    }
+}
+
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to