This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8930df3b31d [Feature](iceberg-writer) Implements iceberg partition 
transform.  (#37692)
8930df3b31d is described below

commit 8930df3b31d25a4b2c85e3661c1afe520e74c7df
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Sat Jul 13 16:07:50 2024 +0800

    [Feature](iceberg-writer) Implements iceberg partition transform.  (#37692)
    
    ## Proposed changes
    
    Cherry-pick iceberg partition transform functionality. #36289 #36889
    
    ---------
    
    Co-authored-by: kang <35803862+ghkan...@users.noreply.github.com>
    Co-authored-by: lik40 <li...@chinatelecom.cn>
    Co-authored-by: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
    Co-authored-by: Mingyu Chen <morning...@163.com>
---
 be/src/util/bit_util.h                             |   22 +
 .../sink/writer/iceberg/partition_transformers.cpp |  172 ++-
 .../sink/writer/iceberg/partition_transformers.h   | 1274 +++++++++++++++++++-
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |   22 +-
 .../writer/iceberg/partition_transformers_test.cpp |  489 ++++++++
 .../apache/doris/common/info/SimpleTableInfo.java  |   66 +
 .../datasource/iceberg/IcebergMetadataCache.java   |   19 +-
 .../datasource/iceberg/IcebergMetadataOps.java     |    4 +
 .../datasource/iceberg/IcebergTransaction.java     |  211 ++--
 .../doris/datasource/iceberg/IcebergUtils.java     |   64 +-
 .../iceberg/helper/IcebergWriterHelper.java        |   91 ++
 .../iceberg/source/IcebergApiSource.java           |    2 +-
 .../iceberg/source/IcebergHMSSource.java           |    4 +-
 .../datasource/statistics/CommonStatistics.java    |   81 ++
 .../commands/insert/IcebergInsertExecutor.java     |   28 +-
 .../org/apache/doris/planner/IcebergTableSink.java |    2 +-
 .../transaction/IcebergTransactionManager.java     |    7 +-
 .../datasource/iceberg/IcebergTransactionTest.java |  139 ++-
 18 files changed, 2468 insertions(+), 229 deletions(-)

diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h
index 230134ade09..6934f45ef3e 100644
--- a/be/src/util/bit_util.h
+++ b/be/src/util/bit_util.h
@@ -98,6 +98,28 @@ public:
         return (v << n) >> n;
     }
 
+    template <typename T>
+    static std::string IntToByteBuffer(T input) {
+        std::string buffer;
+        T value = input;
+        for (int i = 0; i < sizeof(value); ++i) {
+            // Applies a mask for a byte range on the input.
+            char value_to_save = value & 0XFF;
+            buffer.push_back(value_to_save);
+            // Remove the just processed part from the input so that we can 
exit early if there
+            // is nothing left to process.
+            value >>= 8;
+            if (value == 0 && value_to_save >= 0) {
+                break;
+            }
+            if (value == -1 && value_to_save < 0) {
+                break;
+            }
+        }
+        std::reverse(buffer.begin(), buffer.end());
+        return buffer;
+    }
+
     // Returns ceil(log2(x)).
     // TODO: this could be faster if we use __builtin_clz.  Fix this if this 
ever shows up
     // in a hot path.
diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp 
b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
index 0faebea6295..ee8268d30f7 100644
--- a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
+++ b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
@@ -25,31 +25,109 @@
 namespace doris {
 namespace vectorized {
 
+const std::chrono::sys_days PartitionColumnTransformUtils::EPOCH = 
std::chrono::sys_days(
+        std::chrono::year {1970} / std::chrono::January / std::chrono::day 
{1});
+
 std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
         const doris::iceberg::PartitionField& field, const TypeDescriptor& 
source_type) {
     auto& transform = field.transform();
-    static const std::regex hasWidth(R"((\w+)\[(\d+)\])");
+    static const std::regex has_width(R"((\w+)\[(\d+)\])");
     std::smatch width_match;
 
-    if (std::regex_match(transform, width_match, hasWidth)) {
+    if (std::regex_match(transform, width_match, has_width)) {
         std::string name = width_match[1];
-        //int parsed_width = std::stoi(width_match[2]);
+        int parsed_width = std::stoi(width_match[2]);
 
         if (name == "truncate") {
             switch (source_type.type) {
+            case TYPE_INT: {
+                return 
std::make_unique<IntegerTruncatePartitionColumnTransform>(source_type,
+                                                                               
  parsed_width);
+            }
+            case TYPE_BIGINT: {
+                return 
std::make_unique<BigintTruncatePartitionColumnTransform>(source_type,
+                                                                               
 parsed_width);
+            }
+            case TYPE_VARCHAR:
+            case TYPE_CHAR:
+            case TYPE_STRING: {
+                return 
std::make_unique<StringTruncatePartitionColumnTransform>(source_type,
+                                                                               
 parsed_width);
+            }
+            case TYPE_DECIMALV2: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V2>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL32: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal32>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL64: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal64>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL128I: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V3>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL256: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal256>>(
+                        source_type, parsed_width);
+            }
             default: {
-                throw doris::Exception(
-                        doris::ErrorCode::INTERNAL_ERROR,
-                        "Unsupported type for truncate partition column 
transform {}",
-                        source_type.debug_string());
+                throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                       "Unsupported type {} for partition 
column transform {}",
+                                       source_type.debug_string(), transform);
             }
             }
         } else if (name == "bucket") {
             switch (source_type.type) {
+            case TYPE_INT: {
+                return 
std::make_unique<IntBucketPartitionColumnTransform>(source_type,
+                                                                           
parsed_width);
+            }
+            case TYPE_BIGINT: {
+                return 
std::make_unique<BigintBucketPartitionColumnTransform>(source_type,
+                                                                              
parsed_width);
+            }
+            case TYPE_VARCHAR:
+            case TYPE_CHAR:
+            case TYPE_STRING: {
+                return 
std::make_unique<StringBucketPartitionColumnTransform>(source_type,
+                                                                              
parsed_width);
+            }
+            case TYPE_DATEV2: {
+                return 
std::make_unique<DateBucketPartitionColumnTransform>(source_type,
+                                                                            
parsed_width);
+            }
+            case TYPE_DATETIMEV2: {
+                return 
std::make_unique<TimestampBucketPartitionColumnTransform>(source_type,
+                                                                               
  parsed_width);
+            }
+            case TYPE_DECIMALV2: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V2>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL32: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal32>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL64: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal64>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL128I: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V3>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL256: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal256>>(
+                        source_type, parsed_width);
+            }
             default: {
                 throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
-                                       "Unsupported type for bucket partition 
column transform {}",
-                                       source_type.debug_string());
+                                       "Unsupported type {} for partition 
column transform {}",
+                                       source_type.debug_string(), transform);
             }
             }
         }
@@ -57,14 +135,79 @@ std::unique_ptr<PartitionColumnTransform> 
PartitionColumnTransforms::create(
 
     if (transform == "identity") {
         return std::make_unique<IdentityPartitionColumnTransform>(source_type);
+    } else if (transform == "year") {
+        switch (source_type.type) {
+        case TYPE_DATEV2: {
+            return 
std::make_unique<DateYearPartitionColumnTransform>(source_type);
+        }
+        case TYPE_DATETIMEV2: {
+            return 
std::make_unique<TimestampYearPartitionColumnTransform>(source_type);
+        }
+        default: {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Unsupported type {} for partition column 
transform {}",
+                                   source_type.debug_string(), transform);
+        }
+        }
+    } else if (transform == "month") {
+        switch (source_type.type) {
+        case TYPE_DATEV2: {
+            return 
std::make_unique<DateMonthPartitionColumnTransform>(source_type);
+        }
+        case TYPE_DATETIMEV2: {
+            return 
std::make_unique<TimestampMonthPartitionColumnTransform>(source_type);
+        }
+        default: {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Unsupported type {} for partition column 
transform {}",
+                                   source_type.debug_string(), transform);
+        }
+        }
+    } else if (transform == "day") {
+        switch (source_type.type) {
+        case TYPE_DATEV2: {
+            return 
std::make_unique<DateDayPartitionColumnTransform>(source_type);
+        }
+        case TYPE_DATETIMEV2: {
+            return 
std::make_unique<TimestampDayPartitionColumnTransform>(source_type);
+        }
+        default: {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Unsupported type {} for partition column 
transform {}",
+                                   source_type.debug_string(), transform);
+        }
+        }
+    } else if (transform == "hour") {
+        switch (source_type.type) {
+        case TYPE_DATETIMEV2: {
+            return 
std::make_unique<TimestampHourPartitionColumnTransform>(source_type);
+        }
+        default: {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Unsupported type {} for partition column 
transform {}",
+                                   source_type.debug_string(), transform);
+        }
+        }
+    } else if (transform == "void") {
+        return std::make_unique<VoidPartitionColumnTransform>(source_type);
     } else {
         throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
-                               "Unsupported partition column transform: {}.", 
transform);
+                               "Unsupported type {} for partition column 
transform {}",
+                               source_type.debug_string(), transform);
     }
 }
 
+std::string PartitionColumnTransform::name() const {
+    return "default";
+}
+
 std::string PartitionColumnTransform::to_human_string(const TypeDescriptor& 
type,
                                                       const std::any& value) 
const {
+    return get_partition_value(type, value);
+}
+
+std::string PartitionColumnTransform::get_partition_value(const 
TypeDescriptor& type,
+                                                          const std::any& 
value) const {
     if (value.has_value()) {
         switch (type.type) {
         case TYPE_BOOLEAN: {
@@ -131,19 +274,12 @@ std::string 
PartitionColumnTransform::to_human_string(const TypeDescriptor& type
         }
         default: {
             throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
-                                   "Unsupported partition column transform: 
{}",
-                                   type.debug_string());
+                                   "Unsupported type {} for partition", 
type.debug_string());
         }
         }
     }
     return "null";
 }
 
-ColumnWithTypeAndName IdentityPartitionColumnTransform::apply(Block& block, 
int idx) {
-    const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(idx);
-    return {column_with_type_and_name.column, column_with_type_and_name.type,
-            column_with_type_and_name.name};
-}
-
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.h 
b/be/src/vec/sink/writer/iceberg/partition_transformers.h
index 13c6238d1db..304c5834844 100644
--- a/be/src/vec/sink/writer/iceberg/partition_transformers.h
+++ b/be/src/vec/sink/writer/iceberg/partition_transformers.h
@@ -45,25 +45,77 @@ public:
             const doris::iceberg::PartitionField& field, const TypeDescriptor& 
source_type);
 };
 
+class PartitionColumnTransformUtils {
+public:
+    static DateV2Value<DateV2ValueType>& epoch_date() {
+        static DateV2Value<DateV2ValueType> epoch_date;
+        static bool initialized = false;
+        if (!initialized) {
+            epoch_date.from_date_str("1970-01-01 00:00:00", 19);
+            initialized = true;
+        }
+        return epoch_date;
+    }
+
+    static DateV2Value<DateTimeV2ValueType>& epoch_datetime() {
+        static DateV2Value<DateTimeV2ValueType> epoch_datetime;
+        static bool initialized = false;
+        if (!initialized) {
+            epoch_datetime.from_date_str("1970-01-01 00:00:00", 19);
+            initialized = true;
+        }
+        return epoch_datetime;
+    }
+
+    static std::string human_year(int year_ordinal) {
+        auto ymd = std::chrono::year_month_day {EPOCH} + 
std::chrono::years(year_ordinal);
+        return std::to_string(static_cast<int>(ymd.year()));
+    }
+
+    static std::string human_month(int month_ordinal) {
+        auto ymd = std::chrono::year_month_day {EPOCH} + 
std::chrono::months(month_ordinal);
+        return fmt::format("{:04d}-{:02d}", static_cast<int>(ymd.year()),
+                           static_cast<unsigned>(ymd.month()));
+    }
+
+    static std::string human_day(int day_ordinal) {
+        auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+                std::chrono::floor<std::chrono::days>(EPOCH + 
std::chrono::days(day_ordinal))));
+        return fmt::format("{:04d}-{:02d}-{:02d}", 
static_cast<int>(ymd.year()),
+                           static_cast<unsigned>(ymd.month()), 
static_cast<unsigned>(ymd.day()));
+    }
+
+    static std::string human_hour(int hour_ordinal) {
+        int day_value = hour_ordinal / 24;
+        int housr_value = hour_ordinal % 24;
+        auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+                std::chrono::floor<std::chrono::days>(EPOCH + 
std::chrono::days(day_value))));
+        return fmt::format("{:04d}-{:02d}-{:02d}-{:02d}", 
static_cast<int>(ymd.year()),
+                           static_cast<unsigned>(ymd.month()), 
static_cast<unsigned>(ymd.day()),
+                           housr_value);
+    }
+
+private:
+    static const std::chrono::sys_days EPOCH;
+    PartitionColumnTransformUtils() = default;
+};
+
 class PartitionColumnTransform {
 public:
     PartitionColumnTransform() = default;
 
     virtual ~PartitionColumnTransform() = default;
 
-    virtual bool preserves_non_null() const { return false; }
-
-    virtual bool monotonic() const { return true; }
-
-    virtual bool temporal() const { return false; }
+    virtual std::string name() const;
 
     virtual const TypeDescriptor& get_result_type() const = 0;
 
-    virtual bool is_void() const { return false; }
-
-    virtual ColumnWithTypeAndName apply(Block& block, int idx) = 0;
+    virtual ColumnWithTypeAndName apply(const Block& block, int column_pos) = 
0;
 
     virtual std::string to_human_string(const TypeDescriptor& type, const 
std::any& value) const;
+
+    virtual std::string get_partition_value(const TypeDescriptor& type,
+                                            const std::any& value) const;
 };
 
 class IdentityPartitionColumnTransform : public PartitionColumnTransform {
@@ -71,12 +123,1214 @@ public:
     IdentityPartitionColumnTransform(const TypeDescriptor& source_type)
             : _source_type(source_type) {}
 
-    virtual const TypeDescriptor& get_result_type() const { return 
_source_type; }
+    std::string name() const override { return "Identity"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        return {column_with_type_and_name.column, 
column_with_type_and_name.type,
+                column_with_type_and_name.name};
+    }
+
+private:
+    TypeDescriptor _source_type;
+};
+
+class StringTruncatePartitionColumnTransform : public PartitionColumnTransform 
{
+public:
+    StringTruncatePartitionColumnTransform(const TypeDescriptor& source_type, 
int width)
+            : _source_type(source_type), _width(width) {}
+
+    std::string name() const override { return "StringTruncate"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        static_cast<void>(_width);
+        auto int_type = std::make_shared<DataTypeInt32>();
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+
+        ColumnPtr string_column_ptr;
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (auto* nullable_column =
+                    
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            string_column_ptr = nullable_column->get_nested_column_ptr();
+            is_nullable = true;
+        } else {
+            string_column_ptr = column_with_type_and_name.column;
+            is_nullable = false;
+        }
+
+        // Create a temp_block to execute substring function.
+        Block temp_block;
+        temp_block.insert(column_with_type_and_name);
+        temp_block.insert({int_type->create_column_const(temp_block.rows(), 
to_field(1)), int_type,
+                           "const 1"});
+        temp_block.insert({int_type->create_column_const(temp_block.rows(), 
to_field(_width)),
+                           int_type, fmt::format("const {}", _width)});
+        temp_block.insert({nullptr, std::make_shared<DataTypeString>(), 
"result"});
+        ColumnNumbers temp_arguments(3);
+        temp_arguments[0] = 0; // str column
+        temp_arguments[1] = 1; // pos
+        temp_arguments[2] = 2; // width
+        size_t result_column_id = 3;
+
+        SubstringUtil::substring_execute(temp_block, temp_arguments, 
result_column_id,
+                                         temp_block.rows());
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(
+                    temp_block.get_by_position(result_column_id).column, 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            auto res_column = 
temp_block.get_by_position(result_column_id).column;
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _width;
+};
+
+class IntegerTruncatePartitionColumnTransform : public 
PartitionColumnTransform {
+public:
+    IntegerTruncatePartitionColumnTransform(const TypeDescriptor& source_type, 
int width)
+            : _source_type(source_type), _width(width) {}
+
+    std::string name() const override { return "IntegerTruncate"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnInt32*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const int* end_in = in_data.data() + in_data.size();
+        const Int32* __restrict p_in = in_data.data();
+        Int32* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            *p_out = *p_in - ((*p_in % _width) + _width) % _width;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {res_column,
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _width;
+};
+
+class BigintTruncatePartitionColumnTransform : public PartitionColumnTransform 
{
+public:
+    BigintTruncatePartitionColumnTransform(const TypeDescriptor& source_type, 
int width)
+            : _source_type(source_type), _width(width) {}
+
+    std::string name() const override { return "BigintTruncate"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnInt64*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt64::create();
+        ColumnInt64::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const Int64* end_in = in_data.data() + in_data.size();
+        const Int64* __restrict p_in = in_data.data();
+        Int64* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            *p_out = *p_in - ((*p_in % _width) + _width) % _width;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {res_column,
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _width;
+};
+
+template <typename T>
+class DecimalTruncatePartitionColumnTransform : public 
PartitionColumnTransform {
+public:
+    DecimalTruncatePartitionColumnTransform(const TypeDescriptor& source_type, 
int width)
+            : _source_type(source_type), _width(width) {}
+
+    std::string name() const override { return "DecimalTruncate"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+
+        ColumnPtr column_ptr;
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (auto* nullable_column =
+                    
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+            is_nullable = true;
+        } else {
+            column_ptr = column_with_type_and_name.column;
+            is_nullable = false;
+        }
+
+        const auto* const decimal_col = 
check_and_get_column<ColumnDecimal<T>>(column_ptr);
+        const auto& vec_src = decimal_col->get_data();
+
+        auto col_res = ColumnDecimal<T>::create(vec_src.size(), 
decimal_col->get_scale());
+        auto& vec_res = col_res->get_data();
+
+        const typename T::NativeType* __restrict p_in =
+                reinterpret_cast<const T::NativeType*>(vec_src.data());
+        const typename T::NativeType* end_in =
+                reinterpret_cast<const T::NativeType*>(vec_src.data()) + 
vec_src.size();
+        typename T::NativeType* __restrict p_out = 
reinterpret_cast<T::NativeType*>(vec_res.data());
+
+        while (p_in < end_in) {
+            typename T::NativeType remainder = ((*p_in % _width) + _width) % 
_width;
+            *p_out = *p_in - remainder;
+            ++p_in;
+            ++p_out;
+        }
+
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {res_column,
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _width;
+};
+
+class IntBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    IntBucketPartitionColumnTransform(const TypeDescriptor& source_type, int 
bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "IntBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnInt32*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const int* end_in = in_data.data() + in_data.size();
+        const Int32* __restrict p_in = in_data.data();
+        Int32* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            Int64 long_value = static_cast<Int64>(*p_in);
+            uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, 
sizeof(long_value), 0);
+            //            *p_out = ((hash_value >> 1) & INT32_MAX) % 
_bucket_num;
+            *p_out = (hash_value & INT32_MAX) % _bucket_num;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class BigintBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    BigintBucketPartitionColumnTransform(const TypeDescriptor& source_type, 
int bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "BigintBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnInt64*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const Int64* end_in = in_data.data() + in_data.size();
+        const Int64* __restrict p_in = in_data.data();
+        Int32* __restrict p_out = out_data.data();
+        while (p_in < end_in) {
+            Int64 long_value = static_cast<Int64>(*p_in);
+            uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, 
sizeof(long_value), 0);
+            //            int value = ((hash_value >> 1) & INT32_MAX) % 
_bucket_num;
+            int value = (hash_value & INT32_MAX) % _bucket_num;
+            *p_out = value;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+template <typename T>
+class DecimalBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DecimalBucketPartitionColumnTransform(const TypeDescriptor& source_type, 
int bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DecimalBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDecimal<T>*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        auto& vec_res = col_res->get_data();
+
+        const typename T::NativeType* __restrict p_in =
+                reinterpret_cast<const T::NativeType*>(in_data.data());
+        const typename T::NativeType* end_in =
+                reinterpret_cast<const T::NativeType*>(in_data.data()) + 
in_data.size();
+        typename T::NativeType* __restrict p_out = 
reinterpret_cast<T::NativeType*>(vec_res.data());
+
+        while (p_in < end_in) {
+            std::string buffer = BitUtil::IntToByteBuffer(*p_in);
+            uint32_t hash_value = HashUtil::murmur_hash3_32(buffer.data(), 
buffer.size(), 0);
+            *p_out = (hash_value & INT32_MAX) % _bucket_num;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        return get_partition_value(type, value);
+    }
+
+    std::string get_partition_value(const TypeDescriptor& type,
+                                    const std::any& value) const override {
+        if (value.has_value()) {
+            return std::to_string(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class DateBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DateBucketPartitionColumnTransform(const TypeDescriptor& source_type, int 
bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DateBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const auto* end_in = in_data.data() + in_data.size();
+
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateV2ValueType> value =
+                    binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+
+            int32_t days_from_unix_epoch = value.daynr() - 719528;
+            Int64 long_value = static_cast<Int64>(days_from_unix_epoch);
+            uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, 
sizeof(long_value), 0);
+
+            *p_out = (hash_value & INT32_MAX) % _bucket_num;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class TimestampBucketPartitionColumnTransform : public 
PartitionColumnTransform {
+public:
+    TimestampBucketPartitionColumnTransform(const TypeDescriptor& source_type, 
int bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const auto* end_in = in_data.data() + in_data.size();
+
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+
+            int64_t timestamp;
+            if (!value.unix_timestamp(&timestamp, "UTC")) {
+                LOG(WARNING) << "Failed to call unix_timestamp :" << 
value.debug_string();
+                timestamp = 0;
+            }
+            Int64 long_value = static_cast<Int64>(timestamp) * 1000000;
+            uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, 
sizeof(long_value), 0);
+
+            *p_out = (hash_value & INT32_MAX) % _bucket_num;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return std::to_string(std::any_cast<Int32>(value));
+            ;
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class StringBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    StringBucketPartitionColumnTransform(const TypeDescriptor& source_type, 
int bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "StringBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto* str_col = assert_cast<const 
ColumnString*>(column_ptr.get());
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        const auto& data = str_col->get_chars();
+        const auto& offsets = str_col->get_offsets();
+
+        size_t offset_size = offsets.size();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(offset_size);
+        auto* __restrict p_out = out_data.data();
+
+        for (int i = 0; i < offset_size; i++) {
+            const unsigned char* raw_str = &data[offsets[i - 1]];
+            ColumnString::Offset size = offsets[i] - offsets[i - 1];
+            uint32_t hash_value = HashUtil::murmur_hash3_32(raw_str, size, 0);
+
+            *p_out = (hash_value & INT32_MAX) % _bucket_num;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class DateYearPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DateYearPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DateYear"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateV2ValueType> value =
+                    binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+            *p_out = 
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class TimestampYearPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    TimestampYearPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampYear"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+            *p_out = 
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class DateMonthPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DateMonthPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DateMonth"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateV2ValueType> value =
+                    binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+            *p_out = 
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class TimestampMonthPartitionColumnTransform : public PartitionColumnTransform 
{
+public:
+    TimestampMonthPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampMonth"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+            *p_out = 
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class DateDayPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DateDayPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DateDay"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateV2ValueType> value =
+                    binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+            *p_out = 
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        return get_partition_value(type, value);
+    }
+
+    std::string get_partition_value(const TypeDescriptor& type,
+                                    const std::any& value) const override {
+        if (value.has_value()) {
+            int day_value = std::any_cast<Int32>(value);
+            return PartitionColumnTransformUtils::human_day(day_value);
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class TimestampDayPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    TimestampDayPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampDay"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+            *p_out = 
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        return get_partition_value(type, value);
+    }
+
+    std::string get_partition_value(const TypeDescriptor& type,
+                                    const std::any& value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_day(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class TimestampHourPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    TimestampHourPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampHour"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+            *p_out = 
datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_hour(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class VoidPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    VoidPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(source_type) {}
+
+    std::string name() const override { return "Void"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(const Block& block, int column_pos) override {
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
 
-    virtual ColumnWithTypeAndName apply(Block& block, int idx);
+        ColumnPtr column_ptr;
+        ColumnPtr null_map_column_ptr;
+        if (auto* nullable_column =
+                    
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        } else {
+            column_ptr = column_with_type_and_name.column;
+        }
+        auto res_column = ColumnNullable::create(std::move(column_ptr),
+                                                 
ColumnUInt8::create(column_ptr->size(), 1));
+        return {std::move(res_column),
+                
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                column_with_type_and_name.name};
+    }
 
 private:
     TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 2703330406c..e59b0593f7b 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -329,8 +329,8 @@ std::vector<std::string> 
VIcebergTableWriter::_partition_values(
         TypeDescriptor result_type =
                 
iceberg_partition_column.partition_column_transform().get_result_type();
         partition_values.emplace_back(
-                
iceberg_partition_column.partition_column_transform().to_human_string(result_type,
-                                                                               
       data.get(i)));
+                
iceberg_partition_column.partition_column_transform().get_partition_value(
+                        result_type, data.get(i)));
     }
 
     return partition_values;
@@ -407,21 +407,25 @@ std::optional<PartitionData> 
VIcebergTableWriter::_get_partition_data(
 std::any VIcebergTableWriter::_get_iceberg_partition_value(
         const TypeDescriptor& type_desc, const ColumnWithTypeAndName& 
partition_column,
         int position) {
-    ColumnPtr column;
-    if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*partition_column.column)) {
+    //1) get the partition column ptr
+    ColumnPtr col_ptr = 
partition_column.column->convert_to_full_column_if_const();
+    CHECK(col_ptr != nullptr);
+    if (col_ptr->is_nullable()) {
+        const ColumnNullable* nullable_column =
+                reinterpret_cast<const 
vectorized::ColumnNullable*>(col_ptr.get());
         auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
         if (null_map_data[position]) {
             return std::any();
         }
-        column = nullable_column->get_nested_column_ptr();
-    } else {
-        column = partition_column.column;
+        col_ptr = nullable_column->get_nested_column_ptr();
     }
-    auto [item, size] = column->get_data_at(position);
+
+    //2) get parition field data from paritionblock
+    auto [item, size] = col_ptr->get_data_at(position);
     switch (type_desc.type) {
     case TYPE_BOOLEAN: {
         vectorized::Field field =
-                vectorized::check_and_get_column<const 
ColumnUInt8>(*column)->operator[](position);
+                vectorized::check_and_get_column<const 
ColumnUInt8>(*col_ptr)->operator[](position);
         return field.get<bool>();
     }
     case TYPE_TINYINT: {
diff --git a/be/test/vec/sink/writer/iceberg/partition_transformers_test.cpp 
b/be/test/vec/sink/writer/iceberg/partition_transformers_test.cpp
new file mode 100644
index 00000000000..a8df4f60d83
--- /dev/null
+++ b/be/test/vec/sink/writer/iceberg/partition_transformers_test.cpp
@@ -0,0 +1,489 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/iceberg/partition_transformers.h"
+
+#include <gtest/gtest.h>
+
+#include "vec/data_types/data_type_time_v2.h"
+
+namespace doris::vectorized {
+
+class PartitionTransformersTest : public testing::Test {
+public:
+    PartitionTransformersTest() = default;
+    virtual ~PartitionTransformersTest() = default;
+};
+
+TEST_F(PartitionTransformersTest, test_integer_truncate_transform) {
+    const std::vector<int32_t> values({1, -1});
+    auto column = ColumnInt32::create();
+    column->insert_many_fix_len_data(reinterpret_cast<const 
char*>(values.data()), values.size());
+    ColumnWithTypeAndName test_int(column->get_ptr(), 
std::make_shared<DataTypeInt32>(),
+                                   "test_int");
+
+    Block block({test_int});
+    TypeDescriptor source_type(PrimitiveType::TYPE_INT);
+    IntegerTruncatePartitionColumnTransform transform(source_type, 10);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {0, -10};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_bigint_truncate_transform) {
+    const std::vector<int64_t> values({1, -1});
+    auto column = ColumnInt64::create();
+    column->insert_many_fix_len_data(reinterpret_cast<const 
char*>(values.data()), values.size());
+    ColumnWithTypeAndName test_bigint(column->get_ptr(), 
std::make_shared<DataTypeInt64>(),
+                                      "test_bigint");
+
+    Block block({test_bigint});
+    TypeDescriptor source_type(PrimitiveType::TYPE_BIGINT);
+    BigintTruncatePartitionColumnTransform transform(source_type, 10);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt64*>(result.column.get())->get_data();
+    std::vector<int64_t> expected_data = {0, -10};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_decimal32_truncate_transform) {
+    const std::vector<int32_t> values({1065});
+    auto column = ColumnDecimal32::create(0, 2);
+    column->insert_many_fix_len_data(reinterpret_cast<const 
char*>(values.data()), values.size());
+    ColumnWithTypeAndName test_decimal32(column->get_ptr(),
+                                         
std::make_shared<DataTypeDecimal<Decimal32>>(4, 2),
+                                         "test_decimal32");
+
+    Block block({test_decimal32});
+    TypeDescriptor source_type = TypeDescriptor::create_decimalv3_type(4, 2);
+    DecimalTruncatePartitionColumnTransform<Decimal32> transform(source_type, 
50);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnDecimal32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {1050};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i].value);
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_string_truncate_transform) {
+    const std::vector<StringRef> values({{"iceberg", sizeof("iceberg") - 1}});
+    auto column = ColumnString::create();
+    column->insert_many_strings(&values[0], values.size());
+    ColumnWithTypeAndName test_string(column->get_ptr(), 
std::make_shared<DataTypeString>(),
+                                      "test_string");
+
+    Block block({test_string});
+    TypeDescriptor source_type = TypeDescriptor::create_string_type();
+    StringTruncatePartitionColumnTransform transform(source_type, 3);
+
+    auto result = transform.apply(block, 0);
+    const auto result_column = assert_cast<const 
ColumnString*>(result.column.get());
+    const char result_data[] = {'i', 'c', 'e'};
+    std::vector<StringRef> expected_data = {
+            {result_data, sizeof(result_data) / sizeof(result_data[0])}};
+    EXPECT_EQ(expected_data.size(), result_column->size());
+    for (size_t i = 0; i < result_column->size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_column->get_data_at(i));
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_integer_bucket_transform) {
+    const std::vector<int32_t> values({34, -123}); // 2017239379, -471378254
+    auto column = ColumnInt32::create();
+    column->insert_many_fix_len_data(reinterpret_cast<const 
char*>(values.data()), values.size());
+    ColumnWithTypeAndName test_int(column->get_ptr(), 
std::make_shared<DataTypeInt32>(),
+                                   "test_int");
+
+    Block block({test_int});
+    TypeDescriptor source_type(PrimitiveType::TYPE_INT);
+    IntBucketPartitionColumnTransform transform(source_type, 16);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {3, 2};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_bigint_bucket_transform) {
+    const std::vector<int64_t> values({34, -123}); // 2017239379, -471378254
+    auto column = ColumnInt64::create();
+    column->insert_many_fix_len_data(reinterpret_cast<const 
char*>(values.data()), values.size());
+    ColumnWithTypeAndName test_bigint(column->get_ptr(), 
std::make_shared<DataTypeInt64>(),
+                                      "test_bigint");
+
+    Block block({test_bigint});
+    TypeDescriptor source_type(PrimitiveType::TYPE_BIGINT);
+    BigintBucketPartitionColumnTransform transform(source_type, 16);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {3, 2};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_decimal32_bucket_transform) {
+    const std::vector<int32_t> values({1420}); // -500754589
+    auto column = ColumnDecimal32::create(0, 2);
+    column->insert_many_fix_len_data(reinterpret_cast<const 
char*>(values.data()), values.size());
+    ColumnWithTypeAndName test_decimal32(column->get_ptr(),
+                                         
std::make_shared<DataTypeDecimal<Decimal32>>(4, 2),
+                                         "test_decimal32");
+
+    Block block({test_decimal32});
+    TypeDescriptor source_type = TypeDescriptor::create_decimalv3_type(4, 2);
+    DecimalBucketPartitionColumnTransform<Decimal32> transform(source_type, 
16);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {3};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_date_bucket_transform) {
+    auto column = ColumnDateV2::create();
+    auto& date_v2_data = column->get_data();
+    DateV2Value<DateV2ValueType> value;
+    value.set_time(2017, 11, 16, 0, 0, 0, 0); // -653330422
+    date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+    ColumnWithTypeAndName test_date(column->get_ptr(), 
std::make_shared<DataTypeDateV2>(),
+                                    "test_date");
+
+    Block block({test_date});
+    TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2);
+    DateBucketPartitionColumnTransform transform(source_type, 16);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {10};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_bucket_transform) {
+    auto column = ColumnDateTimeV2::create();
+    auto& datetime_v2_data = column->get_data();
+    DateV2Value<DateTimeV2ValueType> value;
+    value.set_time(2017, 11, 16, 22, 31, 8, 0); // -2047944441
+    datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+    ColumnWithTypeAndName test_timestamp(column->get_ptr(), 
std::make_shared<DataTypeDateTimeV2>(),
+                                         "test_timestamp");
+
+    Block block({test_timestamp});
+    TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+    TimestampBucketPartitionColumnTransform transform(source_type, 16);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {7};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_string_bucket_transform) {
+    const std::vector<StringRef> values({{"iceberg", sizeof("iceberg") - 1}}); 
// 1210000089
+    auto column = ColumnString::create();
+    column->insert_many_strings(&values[0], values.size());
+    ColumnWithTypeAndName test_string(column->get_ptr(), 
std::make_shared<DataTypeString>(),
+                                      "test_string");
+
+    Block block({test_string});
+    TypeDescriptor source_type(PrimitiveType::TYPE_STRING);
+    StringBucketPartitionColumnTransform transform(source_type, 16);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {9};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_date_year_transform) {
+    auto column = ColumnDateV2::create();
+    auto& date_v2_data = column->get_data();
+    DateV2Value<DateV2ValueType> value;
+    value.set_time(2017, 11, 16, 0, 0, 0, 0);
+    date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+    ColumnWithTypeAndName test_date(column->get_ptr(), 
std::make_shared<DataTypeDateV2>(),
+                                    "test_date");
+
+    Block block({test_date});
+    TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2);
+    DateYearPartitionColumnTransform transform(source_type);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {47};
+    std::vector<std::string> expected_human_string = {"2017"};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+        EXPECT_EQ(expected_human_string[i],
+                  transform.to_human_string(transform.get_result_type(), 
result_data[i]));
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_year_transform) {
+    auto column = ColumnDateTimeV2::create();
+    auto& datetime_v2_data = column->get_data();
+    DateV2Value<DateTimeV2ValueType> value;
+    value.set_time(2017, 11, 16, 22, 31, 8, 0);
+    datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+    ColumnWithTypeAndName test_timestamp(column->get_ptr(), 
std::make_shared<DataTypeDateTimeV2>(),
+                                         "test_timestamp");
+
+    Block block({test_timestamp});
+    TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+    TimestampYearPartitionColumnTransform transform(source_type);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {47};
+    std::vector<std::string> expected_human_string = {"2017"};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+        EXPECT_EQ(expected_human_string[i],
+                  transform.to_human_string(transform.get_result_type(), 
result_data[i]));
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_date_month_transform) {
+    auto column = ColumnDateV2::create();
+    auto& date_v2_data = column->get_data();
+    DateV2Value<DateV2ValueType> value;
+    value.set_time(2017, 11, 16, 0, 0, 0, 0);
+    date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+    ColumnWithTypeAndName test_date(column->get_ptr(), 
std::make_shared<DataTypeDateV2>(),
+                                    "test_date");
+
+    Block block({test_date});
+    TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2);
+    DateMonthPartitionColumnTransform transform(source_type);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {574};
+    std::vector<std::string> expected_human_string = {"2017-11"};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+        EXPECT_EQ(expected_human_string[i],
+                  transform.to_human_string(transform.get_result_type(), 
result_data[i]));
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_month_transform) {
+    auto column = ColumnDateTimeV2::create();
+    auto& datetime_v2_data = column->get_data();
+    DateV2Value<DateTimeV2ValueType> value;
+    value.set_time(2017, 11, 16, 22, 31, 8, 0);
+    datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+    ColumnWithTypeAndName test_timestamp(column->get_ptr(), 
std::make_shared<DataTypeDateTimeV2>(),
+                                         "test_timestamp");
+
+    Block block({test_timestamp});
+    TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+    TimestampMonthPartitionColumnTransform transform(source_type);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {574};
+    std::vector<std::string> expected_human_string = {"2017-11"};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+        EXPECT_EQ(expected_human_string[i],
+                  transform.to_human_string(transform.get_result_type(), 
result_data[i]));
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_date_day_transform) {
+    auto column = ColumnDateV2::create();
+    auto& date_v2_data = column->get_data();
+    DateV2Value<DateV2ValueType> value;
+    value.set_time(2017, 11, 16, 0, 0, 0, 0);
+    date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value));
+    ColumnWithTypeAndName test_date(column->get_ptr(), 
std::make_shared<DataTypeDateV2>(),
+                                    "test_date");
+
+    Block block({test_date});
+    TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2);
+    DateDayPartitionColumnTransform transform(source_type);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {17486};
+    std::vector<std::string> expected_human_string = {"2017-11-16"};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+        EXPECT_EQ(expected_human_string[i],
+                  transform.to_human_string(transform.get_result_type(), 
result_data[i]));
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_day_transform) {
+    auto column = ColumnDateTimeV2::create();
+    auto& datetime_v2_data = column->get_data();
+    DateV2Value<DateTimeV2ValueType> value;
+    value.set_time(2017, 11, 16, 22, 31, 8, 0);
+    datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+    ColumnWithTypeAndName test_timestamp(column->get_ptr(), 
std::make_shared<DataTypeDateTimeV2>(),
+                                         "test_timestamp");
+
+    Block block({test_timestamp});
+    TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+    TimestampDayPartitionColumnTransform transform(source_type);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {17486};
+    std::vector<std::string> expected_human_string = {"2017-11-16"};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+        EXPECT_EQ(expected_human_string[i],
+                  transform.to_human_string(transform.get_result_type(), 
result_data[i]));
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_timestamp_hour_transform) {
+    auto column = ColumnDateTimeV2::create();
+    auto& datetime_v2_data = column->get_data();
+    DateV2Value<DateTimeV2ValueType> value;
+    value.set_time(2017, 11, 16, 22, 31, 8, 0);
+    datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value));
+    ColumnWithTypeAndName test_timestamp(column->get_ptr(), 
std::make_shared<DataTypeDateTimeV2>(),
+                                         "test_timestamp");
+
+    Block block({test_timestamp});
+    TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2);
+    TimestampHourPartitionColumnTransform transform(source_type);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_data = assert_cast<const 
ColumnInt32*>(result.column.get())->get_data();
+    std::vector<int32_t> expected_data = {419686};
+    std::vector<std::string> expected_human_string = {"2017-11-16-22"};
+    EXPECT_EQ(expected_data.size(), result_data.size());
+    for (size_t i = 0; i < result_data.size(); ++i) {
+        EXPECT_EQ(expected_data[i], result_data[i]);
+        EXPECT_EQ(expected_human_string[i],
+                  transform.to_human_string(transform.get_result_type(), 
result_data[i]));
+    }
+}
+
+TEST_F(PartitionTransformersTest, test_void_transform) {
+    const std::vector<int32_t> values({1, -1});
+    auto column = ColumnInt32::create();
+    column->insert_many_fix_len_data(reinterpret_cast<const 
char*>(values.data()), values.size());
+    ColumnWithTypeAndName test_int(column->get_ptr(), 
std::make_shared<DataTypeInt32>(),
+                                   "test_int");
+
+    Block block({test_int});
+    TypeDescriptor source_type(PrimitiveType::TYPE_INT);
+    VoidPartitionColumnTransform transform(source_type);
+
+    auto result = transform.apply(block, 0);
+
+    const auto& result_null_map_data =
+            assert_cast<const 
ColumnNullable*>(result.column.get())->get_null_map_data();
+
+    for (size_t i = 0; i < result_null_map_data.size(); ++i) {
+        EXPECT_EQ(1, result_null_map_data[i]);
+    }
+}
+
+TEST_F(PartitionTransformersTest, 
test_nullable_column_integer_truncate_transform) {
+    const std::vector<int32_t> values({1, -1});
+    auto column = ColumnNullable::create(ColumnInt32::create(), 
ColumnUInt8::create());
+    column->insert_data(nullptr, 0);
+    column->insert_many_fix_len_data(reinterpret_cast<const 
char*>(values.data()), values.size());
+    ColumnWithTypeAndName test_int(
+            column->get_ptr(),
+            
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()), 
"test_int");
+
+    Block block({test_int});
+    TypeDescriptor source_type(PrimitiveType::TYPE_INT);
+    IntegerTruncatePartitionColumnTransform transform(source_type, 10);
+
+    auto result = transform.apply(block, 0);
+
+    std::vector<int32_t> expected_data = {0, -10};
+    std::vector<std::string> expected_human_string = {"0", "-10"};
+    const auto* result_column = assert_cast<const 
ColumnNullable*>(result.column.get());
+    const auto& result_data =
+            assert_cast<const 
ColumnInt32*>(result_column->get_nested_column_ptr().get())
+                    ->get_data();
+    const auto& null_map_column = result_column->get_null_map_column();
+
+    EXPECT_EQ(1, null_map_column[0]);
+    EXPECT_EQ(0, null_map_column[1]);
+    EXPECT_EQ(0, null_map_column[2]);
+
+    for (size_t i = 0, j = 0; i < result_column->size(); ++i) {
+        if (null_map_column[i] == 0) {
+            EXPECT_EQ(expected_data[j], result_data[i]);
+            EXPECT_EQ(expected_human_string[j],
+                      transform.to_human_string(transform.get_result_type(), 
result_data[i]));
+            ++j;
+        }
+    }
+}
+
+} // namespace doris::vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
new file mode 100644
index 00000000000..6fdb27e1d0b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/AnalysisException.java
+// and modified by Doris
+
+package org.apache.doris.common.info;
+
+import java.util.Objects;
+
+public class SimpleTableInfo {
+
+    private final String dbName;
+    private final String tbName;
+
+    public SimpleTableInfo(String dbName, String tbName) {
+        this.dbName = dbName;
+        this.tbName = tbName;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getTbName() {
+        return tbName;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(dbName, tbName);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        SimpleTableInfo that = (SimpleTableInfo) other;
+        return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, 
that.tbName);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s.%s", dbName, tbName);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index acda08b7378..68064c4e439 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
@@ -85,6 +86,20 @@ public class IcebergMetadataCache {
         return tableCache.get(key);
     }
 
+    public Table getAndCloneTable(CatalogIf catalog, String dbName, String 
tbName) {
+        Table restTable;
+        synchronized (this) {
+            Table table = getIcebergTable(catalog, dbName, tbName);
+            restTable = SerializableTable.copyOf(table);
+        }
+        return restTable;
+    }
+
+    public Table getRemoteTable(CatalogIf catalog, String dbName, String 
tbName) {
+        IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, 
dbName, tbName);
+        return loadTable(key);
+    }
+
     @NotNull
     private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
         Table icebergTable = getIcebergTable(key.catalog, key.dbName, 
key.tableName);
@@ -116,7 +131,7 @@ public class IcebergMetadataCache {
     public void invalidateCatalogCache(long catalogId) {
         snapshotListCache.asMap().keySet().stream()
                 .filter(key -> key.catalog.getId() == catalogId)
-            .forEach(snapshotListCache::invalidate);
+                .forEach(snapshotListCache::invalidate);
 
         tableCache.asMap().entrySet().stream()
                 .filter(entry -> entry.getKey().catalog.getId() == catalogId)
@@ -130,7 +145,7 @@ public class IcebergMetadataCache {
         snapshotListCache.asMap().keySet().stream()
                 .filter(key -> key.catalog.getId() == catalogId && 
key.dbName.equals(dbName) && key.tableName.equals(
                         tblName))
-            .forEach(snapshotListCache::invalidate);
+                .forEach(snapshotListCache::invalidate);
 
         tableCache.asMap().entrySet().stream()
                 .filter(entry -> {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index c7fef68ee97..a6933f83d76 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -64,6 +64,10 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         return catalog;
     }
 
+    public IcebergExternalCatalog getExternalCatalog() {
+        return dorisCatalog;
+    }
+
     @Override
     public void close() {
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 5025e075142..a3a978ccd7a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -21,31 +21,39 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.common.UserException;
-import org.apache.doris.thrift.TFileContent;
+import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
 import org.apache.doris.thrift.TIcebergCommitData;
+import org.apache.doris.thrift.TUpdateMode;
 import org.apache.doris.transaction.Transaction;
 
-import com.google.common.base.VerifyException;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.Metrics;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.WriteResult;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 public class IcebergTransaction implements Transaction {
 
     private static final Logger LOG = 
LogManager.getLogger(IcebergTransaction.class);
+
     private final IcebergMetadataOps ops;
+    private SimpleTableInfo tableInfo;
+    private Table table;
+
+
     private org.apache.iceberg.Transaction transaction;
     private final List<TIcebergCommitData> commitDataList = 
Lists.newArrayList();
 
@@ -59,140 +67,123 @@ public class IcebergTransaction implements Transaction {
         }
     }
 
-    public void beginInsert(String dbName, String tbName) {
-        Table icebergTable = 
ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName));
-        transaction = icebergTable.newTransaction();
+    public void beginInsert(SimpleTableInfo tableInfo) {
+        this.tableInfo = tableInfo;
+        this.table = getNativeTable(tableInfo);
+        this.transaction = table.newTransaction();
     }
 
-    public void finishInsert() {
-        Table icebergTable = transaction.table();
-        AppendFiles appendFiles = transaction.newAppend();
-
-        for (CommitTaskData task : convertToCommitTaskData()) {
-            DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
-                    .withPath(task.getPath())
-                    .withFileSizeInBytes(task.getFileSizeInBytes())
-                    .withFormat(IcebergUtils.getFileFormat(icebergTable))
-                    .withMetrics(task.getMetrics());
-
-            if (icebergTable.spec().isPartitioned()) {
-                List<String> partitionValues = task.getPartitionValues()
-                        .orElseThrow(() -> new VerifyException("No partition 
data for partitioned table"));
-                builder.withPartitionValues(partitionValues);
-            }
-            appendFiles.appendFile(builder.build());
+    public void finishInsert(SimpleTableInfo tableInfo, 
Optional<InsertCommandContext> insertCtx) {
+        if (LOG.isDebugEnabled()) {
+            LOG.info("iceberg table {} insert table finished!", tableInfo);
         }
 
-        // in appendFiles.commit, it will generate metadata(manifest and 
snapshot)
-        // after appendFiles.commit, in current transaction, you can already 
see the new snapshot
-        appendFiles.commit();
-    }
-
-    public List<CommitTaskData> convertToCommitTaskData() {
-        List<CommitTaskData> commitTaskData = new ArrayList<>();
-        for (TIcebergCommitData data : this.commitDataList) {
-            commitTaskData.add(new CommitTaskData(
-                    data.getFilePath(),
-                    data.getFileSize(),
-                    new Metrics(
-                            data.getRowCount(),
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP
-                    ),
-                    data.isSetPartitionValues() ? 
Optional.of(data.getPartitionValues()) : Optional.empty(),
-                    convertToFileContent(data.getFileContent()),
-                    data.isSetReferencedDataFiles() ? 
Optional.of(data.getReferencedDataFiles()) : Optional.empty()
-            ));
+        //create and start the iceberg transaction
+        TUpdateMode updateMode = TUpdateMode.APPEND;
+        if (insertCtx.isPresent()) {
+            updateMode = ((BaseExternalTableInsertCommandContext) 
insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
+                    : TUpdateMode.APPEND;
         }
-        return commitTaskData;
+        updateManifestAfterInsert(updateMode);
     }
 
-    private FileContent convertToFileContent(TFileContent content) {
-        if (content.equals(TFileContent.DATA)) {
-            return FileContent.DATA;
-        } else if (content.equals(TFileContent.POSITION_DELETES)) {
-            return FileContent.POSITION_DELETES;
+    private void updateManifestAfterInsert(TUpdateMode updateMode) {
+        PartitionSpec spec = table.spec();
+        FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+
+        //convert commitDataList to writeResult
+        WriteResult writeResult = IcebergWriterHelper
+                .convertToWriterResult(fileFormat, spec, commitDataList);
+        List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
+
+        if (spec.isPartitioned()) {
+            partitionManifestUpdate(updateMode, table, pendingResults);
+            if (LOG.isDebugEnabled()) {
+                LOG.info("{} {} table partition manifest  successful and 
writeResult : {}..", tableInfo, updateMode,
+                        writeResult);
+            }
         } else {
-            return FileContent.EQUALITY_DELETES;
+            tableManifestUpdate(updateMode, table, pendingResults);
+            if (LOG.isDebugEnabled()) {
+                LOG.info("{} {}  table  manifest  successful and writeResult : 
{}..", tableInfo, updateMode,
+                        writeResult);
+            }
         }
     }
 
     @Override
     public void commit() throws UserException {
-        // Externally readable
-        // Manipulate the relevant data so that others can also see the latest 
table, such as:
-        //   1. hadoop: it will change the version number information in 
'version-hint.text'
-        //   2. hive: it will change the table properties, the most important 
thing is to revise 'metadata_location'
-        //   3. and so on ...
+        // commit the iceberg transaction
         transaction.commitTransaction();
     }
 
     @Override
     public void rollback() {
-
+        //do nothing
     }
 
     public long getUpdateCnt() {
         return 
commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
     }
 
-    public static class CommitTaskData {
-        private final String path;
-        private final long fileSizeInBytes;
-        private final Metrics metrics;
-        private final Optional<List<String>> partitionValues;
-        private final FileContent content;
-        private final Optional<List<String>> referencedDataFiles;
-
-        public CommitTaskData(String path,
-                              long fileSizeInBytes,
-                              Metrics metrics,
-                              Optional<List<String>> partitionValues,
-                              FileContent content,
-                              Optional<List<String>> referencedDataFiles) {
-            this.path = path;
-            this.fileSizeInBytes = fileSizeInBytes;
-            this.metrics = metrics;
-            this.partitionValues = 
convertPartitionValuesForNull(partitionValues);
-            this.content = content;
-            this.referencedDataFiles = referencedDataFiles;
-        }
 
-        private Optional<List<String>> 
convertPartitionValuesForNull(Optional<List<String>> partitionValues) {
-            if (!partitionValues.isPresent()) {
-                return partitionValues;
-            }
-            List<String> values = partitionValues.get();
-            if (!values.contains("null")) {
-                return partitionValues;
-            }
-            return Optional.of(values.stream().map(s -> s.equals("null") ? 
null : s).collect(Collectors.toList()));
-        }
+    private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
+        Objects.requireNonNull(tableInfo);
+        IcebergExternalCatalog externalCatalog = ops.getExternalCatalog();
+        return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
+    }
 
-        public String getPath() {
-            return path;
+    private void partitionManifestUpdate(TUpdateMode updateMode, Table table, 
List<WriteResult> pendingResults) {
+        if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
+            LOG.warn("{} partitionManifestUp method call but pendingResults is 
null or empty!", table.name());
+            return;
         }
-
-        public long getFileSizeInBytes() {
-            return fileSizeInBytes;
+        // Commit the appendPartitionOperator transaction.
+        if (updateMode == TUpdateMode.APPEND) {
+            commitAppendTxn(table, pendingResults);
+        } else {
+            ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+            for (WriteResult result : pendingResults) {
+                Preconditions.checkState(result.referencedDataFiles().length 
== 0,
+                        "Should have no referenced data files.");
+                
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
+            }
+            appendPartitionOp.commit();
         }
+    }
 
-        public Metrics getMetrics() {
-            return metrics;
+    private void tableManifestUpdate(TUpdateMode updateMode, Table table, 
List<WriteResult> pendingResults) {
+        if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
+            LOG.warn("{} tableManifestUp method call but pendingResults is 
null or empty!", table.name());
+            return;
         }
-
-        public Optional<List<String>> getPartitionValues() {
-            return partitionValues;
+        // Commit the appendPartitionOperator transaction.
+        if (LOG.isDebugEnabled()) {
+            LOG.info("{} tableManifestUp method call  ", table.name());
         }
-
-        public FileContent getContent() {
-            return content;
+        if (updateMode == TUpdateMode.APPEND) {
+            commitAppendTxn(table, pendingResults);
+        } else {
+            ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+            for (WriteResult result : pendingResults) {
+                Preconditions.checkState(result.referencedDataFiles().length 
== 0,
+                        "Should have no referenced data files.");
+                
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
+            }
+            appendPartitionOp.commit();
         }
+    }
 
-        public Optional<List<String>> getReferencedDataFiles() {
-            return referencedDataFiles;
+
+    private void commitAppendTxn(Table table, List<WriteResult> 
pendingResults) {
+        // To be compatible with iceberg format V1.
+        AppendFiles appendFiles = table.newAppend();
+        for (WriteResult result : pendingResults) {
+            Preconditions.checkState(result.referencedDataFiles().length == 0,
+                    "Should have no referenced data files for append.");
+            Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
         }
+        appendFiles.commit();
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 2aa5dda35a4..512e6a3ee93 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -43,6 +43,7 @@ import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
@@ -50,6 +51,7 @@ import 
org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.thrift.TExprOpcode;
 
 import com.google.common.collect.Lists;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -87,6 +89,8 @@ public class IcebergUtils {
     // https://iceberg.apache.org/spec/#schemas-and-data-types
     // All time and timestamp values are stored with microsecond precision
     private static final int ICEBERG_DATETIME_SCALE_MS = 6;
+    private static final String PARQUET_NAME = "parquet";
+    private static final String ORC_NAME = "orc";
 
     public static final String TOTAL_RECORDS = "total-records";
     public static final String TOTAL_POSITION_DELETES = 
"total-position-deletes";
@@ -522,8 +526,8 @@ public class IcebergUtils {
             case MAP:
                 Types.MapType map = (Types.MapType) type;
                 return new MapType(
-                    icebergTypeToDorisType(map.keyType()),
-                    icebergTypeToDorisType(map.valueType())
+                        icebergTypeToDorisType(map.keyType()),
+                        icebergTypeToDorisType(map.valueType())
                 );
             case STRUCT:
                 Types.StructType struct = (Types.StructType) type;
@@ -536,11 +540,30 @@ public class IcebergUtils {
         }
     }
 
+
     public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog 
catalog, String dbName, String tblName) {
+        return getIcebergTableInternal(catalog, dbName, tblName, false);
+    }
+
+    public static org.apache.iceberg.Table getAndCloneTable(ExternalCatalog 
catalog, SimpleTableInfo tableInfo) {
+        return getIcebergTableInternal(catalog, tableInfo.getDbName(), 
tableInfo.getTbName(), true);
+    }
+
+    public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog 
catalog, SimpleTableInfo tableInfo) {
         return Env.getCurrentEnv()
                 .getExtMetaCacheMgr()
                 .getIcebergMetadataCache()
-                .getIcebergTable(catalog, dbName, tblName);
+                .getRemoteTable(catalog, tableInfo.getDbName(), 
tableInfo.getTbName());
+    }
+
+    private static org.apache.iceberg.Table 
getIcebergTableInternal(ExternalCatalog catalog, String dbName,
+            String tblName,
+            boolean isClone) {
+        IcebergMetadataCache metadataCache = Env.getCurrentEnv()
+                .getExtMetaCacheMgr()
+                .getIcebergMetadataCache();
+        return isClone ? metadataCache.getAndCloneTable(catalog, dbName, 
tblName)
+                : metadataCache.getIcebergTable(catalog, dbName, tblName);
     }
 
     /**
@@ -587,17 +610,27 @@ public class IcebergUtils {
         return -1;
     }
 
-    public static String getFileFormat(Table table) {
-        Map<String, String> properties = table.properties();
+
+    public static FileFormat getFileFormat(Table icebergTable) {
+        Map<String, String> properties = icebergTable.properties();
+        String fileFormatName;
         if (properties.containsKey(WRITE_FORMAT)) {
-            return properties.get(WRITE_FORMAT);
+            fileFormatName = properties.get(WRITE_FORMAT);
+        } else {
+            fileFormatName = 
properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, PARQUET_NAME);
         }
-        if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
-            return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
+        FileFormat fileFormat;
+        if (fileFormatName.toLowerCase().contains(ORC_NAME)) {
+            fileFormat = FileFormat.ORC;
+        } else if (fileFormatName.toLowerCase().contains(PARQUET_NAME)) {
+            fileFormat = FileFormat.PARQUET;
+        } else {
+            throw new RuntimeException("Unsupported input format type: " + 
fileFormatName);
         }
-        return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+        return fileFormat;
     }
 
+
     public static String getFileCompress(Table table) {
         Map<String, String> properties = table.properties();
         if (properties.containsKey(COMPRESSION_CODEC)) {
@@ -605,11 +638,11 @@ public class IcebergUtils {
         } else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) {
             return properties.get(SPARK_SQL_COMPRESSION_CODEC);
         }
-        String fileFormat = getFileFormat(table);
-        if (fileFormat.equalsIgnoreCase("parquet")) {
+        FileFormat fileFormat = getFileFormat(table);
+        if (fileFormat == FileFormat.PARQUET) {
             return properties.getOrDefault(
                     TableProperties.PARQUET_COMPRESSION, 
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
-        } else if (fileFormat.equalsIgnoreCase("orc")) {
+        } else if (fileFormat == FileFormat.ORC) {
             return properties.getOrDefault(
                     TableProperties.ORC_COMPRESSION, 
TableProperties.ORC_COMPRESSION_DEFAULT);
         }
@@ -620,9 +653,10 @@ public class IcebergUtils {
         Map<String, String> properties = table.properties();
         if 
(properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) {
             throw new NotSupportedException(
-                "Table " + table.name() + " specifies " + 
properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
-                    + " as a location provider. "
-                    + "Writing to Iceberg tables with custom location provider 
is not supported.");
+                    "Table " + table.name() + " specifies " + properties
+                            .get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
+                            + " as a location provider. "
+                            + "Writing to Iceberg tables with custom location 
provider is not supported.");
         }
         String dataLocation = 
properties.get(TableProperties.WRITE_DATA_LOCATION);
         if (dataLocation == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
new file mode 100644
index 00000000000..4171a0536f9
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.helper;
+
+import org.apache.doris.datasource.statistics.CommonStatistics;
+import org.apache.doris.thrift.TIcebergCommitData;
+
+import com.google.common.base.VerifyException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.WriteResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class IcebergWriterHelper {
+
+    private static final int DEFAULT_FILE_COUNT = 1;
+
+    public static WriteResult convertToWriterResult(
+            FileFormat format,
+            PartitionSpec spec,
+            List<TIcebergCommitData> commitDataList) {
+        List<DataFile> dataFiles = new ArrayList<>();
+        for (TIcebergCommitData commitData : commitDataList) {
+            //get the files path
+            String location = commitData.getFilePath();
+
+            //get the commit file statistics
+            long fileSize = commitData.getFileSize();
+            long recordCount = commitData.getRowCount();
+            CommonStatistics stat = new CommonStatistics(recordCount, 
DEFAULT_FILE_COUNT, fileSize);
+
+            Optional<List<String>> partValues = Optional.empty();
+            //get and check partitionValues when table is partitionedTable
+            if (spec.isPartitioned()) {
+                List<String> partitionValues = commitData.getPartitionValues();
+                if (Objects.isNull(partitionValues) || 
partitionValues.isEmpty()) {
+                    throw new VerifyException("No partition data for 
partitioned table");
+                }
+                partitionValues = partitionValues.stream().map(s -> 
s.equals("null") ? null : s)
+                        .collect(Collectors.toList());
+                partValues = Optional.of(partitionValues);
+            }
+            DataFile dataFile = genDataFile(format, location, spec, 
partValues, stat);
+            dataFiles.add(dataFile);
+        }
+        return WriteResult.builder()
+                .addDataFiles(dataFiles)
+                .build();
+
+    }
+
+    public static DataFile genDataFile(
+            FileFormat format,
+            String location,
+            PartitionSpec spec,
+            Optional<List<String>> partValues,
+            CommonStatistics statistics) {
+
+        DataFiles.Builder builder = DataFiles.builder(spec)
+                .withPath(location)
+                .withFileSizeInBytes(statistics.getTotalFileBytes())
+                .withRecordCount(statistics.getRowCount())
+                .withFormat(format);
+
+        partValues.ifPresent(builder::withPartitionValues);
+
+        return builder.build();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index e590e918344..56ff188f964 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -61,7 +61,7 @@ public class IcebergApiSource implements IcebergSource {
 
     @Override
     public String getFileFormat() {
-        return IcebergUtils.getFileFormat(originTable);
+        return IcebergUtils.getFileFormat(originTable).name();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
index 06b785a15f8..5e9860171d0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
@@ -41,7 +41,7 @@ public class IcebergHMSSource implements IcebergSource {
     private final org.apache.iceberg.Table icebergTable;
 
     public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
-                            Map<String, ColumnRange> columnNameToRange) {
+            Map<String, ColumnRange> columnNameToRange) {
         this.hmsTable = hmsTable;
         this.desc = desc;
         this.columnNameToRange = columnNameToRange;
@@ -58,7 +58,7 @@ public class IcebergHMSSource implements IcebergSource {
 
     @Override
     public String getFileFormat() throws DdlException, MetaNotFoundException {
-        return IcebergUtils.getFileFormat(icebergTable);
+        return IcebergUtils.getFileFormat(icebergTable).name();
     }
 
     public org.apache.iceberg.Table getIcebergTable() throws 
MetaNotFoundException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
new file mode 100644
index 00000000000..9685dfdf35a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.statistics;
+
+public class CommonStatistics {
+
+    public static final CommonStatistics EMPTY = new CommonStatistics(0L, 0L, 
0L);
+
+    private final long rowCount;
+    private final long fileCount;
+    private final long totalFileBytes;
+
+    public CommonStatistics(long rowCount, long fileCount, long 
totalFileBytes) {
+        this.fileCount = fileCount;
+        this.rowCount = rowCount;
+        this.totalFileBytes = totalFileBytes;
+    }
+
+    public long getRowCount() {
+        return rowCount;
+    }
+
+    public long getFileCount() {
+        return fileCount;
+    }
+
+    public long getTotalFileBytes() {
+        return totalFileBytes;
+    }
+
+    public static CommonStatistics reduce(
+            CommonStatistics current,
+            CommonStatistics update,
+            ReduceOperator operator) {
+        return new CommonStatistics(
+                reduce(current.getRowCount(), update.getRowCount(), operator),
+                reduce(current.getFileCount(), update.getFileCount(), 
operator),
+                reduce(current.getTotalFileBytes(), 
update.getTotalFileBytes(), operator));
+    }
+
+    public static long reduce(long current, long update, ReduceOperator 
operator) {
+        if (current >= 0 && update >= 0) {
+            switch (operator) {
+                case ADD:
+                    return current + update;
+                case SUBTRACT:
+                    return current - update;
+                case MAX:
+                    return Math.max(current, update);
+                case MIN:
+                    return Math.min(current, update);
+                default:
+                    throw new IllegalArgumentException("Unexpected operator: " 
+ operator);
+            }
+        }
+
+        return 0;
+    }
+
+    public enum ReduceOperator {
+        ADD,
+        SUBTRACT,
+        MIN,
+        MAX,
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
index b19c483c9f3..86b1f1ef0b7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergTransaction;
 import org.apache.doris.nereids.NereidsPlanner;
@@ -39,9 +40,9 @@ public class IcebergInsertExecutor extends 
BaseExternalTableInsertExecutor {
      * constructor
      */
     public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable 
table,
-                                 String labelName, NereidsPlanner planner,
-                                 Optional<InsertCommandContext> insertCtx,
-                                 boolean emptyInsert) {
+            String labelName, NereidsPlanner planner,
+            Optional<InsertCommandContext> insertCtx,
+            boolean emptyInsert) {
         super(ctx, table, labelName, planner, insertCtx, emptyInsert);
     }
 
@@ -51,11 +52,23 @@ public class IcebergInsertExecutor extends 
BaseExternalTableInsertExecutor {
         
coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData);
     }
 
+    @Override
+    protected void beforeExec() {
+        String dbName = ((IcebergExternalTable) table).getDbName();
+        String tbName = table.getName();
+        SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
+        IcebergTransaction transaction = (IcebergTransaction) 
transactionManager.getTransaction(txnId);
+        transaction.beginInsert(tableInfo);
+    }
+
     @Override
     protected void doBeforeCommit() throws UserException {
+        String dbName = ((IcebergExternalTable) table).getDbName();
+        String tbName = table.getName();
+        SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
         IcebergTransaction transaction = (IcebergTransaction) 
transactionManager.getTransaction(txnId);
-        loadedRows = transaction.getUpdateCnt();
-        transaction.finishInsert();
+        this.loadedRows = transaction.getUpdateCnt();
+        transaction.finishInsert(tableInfo, insertCtx);
     }
 
     @Override
@@ -63,9 +76,4 @@ public class IcebergInsertExecutor extends 
BaseExternalTableInsertExecutor {
         return TransactionType.ICEBERG;
     }
 
-    @Override
-    protected void beforeExec() {
-        IcebergTransaction transaction = (IcebergTransaction) 
transactionManager.getTransaction(txnId);
-        transaction.beginInsert(((IcebergExternalTable) table).getDbName(), 
table.getName());
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index 659be7cb1fe..0e01b599964 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -121,7 +121,7 @@ public class IcebergTableSink extends 
BaseExternalTableDataSink {
         }
 
         // file info
-        
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable)));
+        
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable).name()));
         
tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable)));
 
         // hadoop config
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
index 4f4fe956d4b..f373c133685 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.transaction;
 
+
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
@@ -58,12 +59,12 @@ public class IcebergTransactionManager implements 
TransactionManager {
     }
 
     @Override
-    public Transaction getTransaction(long id) {
+    public IcebergTransaction getTransaction(long id) {
         return getTransactionWithException(id);
     }
 
-    public Transaction getTransactionWithException(long id) {
-        Transaction icebergTransaction = transactions.get(id);
+    public IcebergTransaction getTransactionWithException(long id) {
+        IcebergTransaction icebergTransaction = transactions.get(id);
         if (icebergTransaction == null) {
             throw new RuntimeException("Can't find transaction for " + id);
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
index 10de5427902..4375dc5c025 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
@@ -18,15 +18,22 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.thrift.TFileContent;
 import org.apache.doris.thrift.TIcebergCommitData;
 
+import com.google.common.collect.Maps;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
@@ -39,10 +46,11 @@ import org.apache.iceberg.transforms.Transforms;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Instant;
@@ -50,23 +58,26 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class IcebergTransactionTest {
 
-    public static String dbName = "db3";
-    public static String tbWithPartition = "tbWithPartition";
-    public static String tbWithoutPartition = "tbWithoutPartition";
-    public static IcebergMetadataOps ops;
-    public static Schema schema;
+    private static String dbName = "db3";
+    private static String tbWithPartition = "tbWithPartition";
+    private static String tbWithoutPartition = "tbWithoutPartition";
 
-    @BeforeClass
-    public static void beforeClass() throws IOException {
+    private IcebergExternalCatalog externalCatalog;
+    private IcebergMetadataOps ops;
+
+
+    @Before
+    public void init() throws IOException {
         createCatalog();
         createTable();
     }
 
-    public static void createCatalog() throws IOException {
+    private void createCatalog() throws IOException {
         Path warehousePath = Files.createTempDirectory("test_warehouse_");
         String warehouse = "file://" + warehousePath.toAbsolutePath() + "/";
         HadoopCatalog hadoopCatalog = new HadoopCatalog();
@@ -74,25 +85,32 @@ public class IcebergTransactionTest {
         props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
         hadoopCatalog.setConf(new Configuration());
         hadoopCatalog.initialize("df", props);
-        ops = new IcebergMetadataOps(null, hadoopCatalog);
+        this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg", 
"", Maps.newHashMap(), "");
+        new MockUp<IcebergHMSExternalCatalog>() {
+            @Mock
+            public Catalog getCatalog() {
+                return hadoopCatalog;
+            }
+        };
+        ops = new IcebergMetadataOps(externalCatalog, hadoopCatalog);
     }
 
-    public static void createTable() throws IOException {
+    private void createTable() throws IOException {
         HadoopCatalog icebergCatalog = (HadoopCatalog) ops.getCatalog();
         icebergCatalog.createNamespace(Namespace.of(dbName));
-        schema = new Schema(
-            Types.NestedField.required(11, "ts1", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.required(12, "ts2", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.required(13, "ts3", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.required(14, "ts4", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.required(15, "dt1", Types.DateType.get()),
-            Types.NestedField.required(16, "dt2", Types.DateType.get()),
-            Types.NestedField.required(17, "dt3", Types.DateType.get()),
-            Types.NestedField.required(18, "dt4", Types.DateType.get()),
-            Types.NestedField.required(19, "str1", Types.StringType.get()),
-            Types.NestedField.required(20, "str2", Types.StringType.get()),
-            Types.NestedField.required(21, "int1", Types.IntegerType.get()),
-            Types.NestedField.required(22, "int2", Types.IntegerType.get())
+        Schema schema = new Schema(
+                Types.NestedField.required(11, "ts1", 
Types.TimestampType.withoutZone()),
+                Types.NestedField.required(12, "ts2", 
Types.TimestampType.withoutZone()),
+                Types.NestedField.required(13, "ts3", 
Types.TimestampType.withoutZone()),
+                Types.NestedField.required(14, "ts4", 
Types.TimestampType.withoutZone()),
+                Types.NestedField.required(15, "dt1", Types.DateType.get()),
+                Types.NestedField.required(16, "dt2", Types.DateType.get()),
+                Types.NestedField.required(17, "dt3", Types.DateType.get()),
+                Types.NestedField.required(18, "dt4", Types.DateType.get()),
+                Types.NestedField.required(19, "str1", Types.StringType.get()),
+                Types.NestedField.required(20, "str2", Types.StringType.get()),
+                Types.NestedField.required(21, "int1", 
Types.IntegerType.get()),
+                Types.NestedField.required(22, "int2", Types.IntegerType.get())
         );
 
         PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
@@ -112,7 +130,7 @@ public class IcebergTransactionTest {
         icebergCatalog.createTable(TableIdentifier.of(dbName, 
tbWithoutPartition), schema);
     }
 
-    public List<String> createPartitionValues() {
+    private List<String> createPartitionValues() {
 
         Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z");
         long ts = DateTimeUtil.microsFromInstant(instant);
@@ -165,14 +183,23 @@ public class IcebergTransactionTest {
         ctdList.add(ctd1);
         ctdList.add(ctd2);
 
+        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithPartition));
+
+        new MockUp<IcebergUtils>() {
+            @Mock
+            public Table getRemoteTable(ExternalCatalog catalog, 
SimpleTableInfo tableInfo) {
+                return table;
+            }
+        };
+
         IcebergTransaction txn = getTxn();
         txn.updateIcebergCommitData(ctdList);
-        txn.beginInsert(dbName, tbWithPartition);
-        txn.finishInsert();
+        SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, 
tbWithPartition);
+        txn.beginInsert(tableInfo);
+        txn.finishInsert(tableInfo, Optional.empty());
         txn.commit();
-        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithPartition));
-        checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", 
"6");
 
+        checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", 
"6");
         checkPushDownByPartitionForTs(table, "ts1");
         checkPushDownByPartitionForTs(table, "ts2");
         checkPushDownByPartitionForTs(table, "ts3");
@@ -189,7 +216,7 @@ public class IcebergTransactionTest {
         checkPushDownByPartitionForBucketInt(table, "int1");
     }
 
-    public void checkPushDownByPartitionForBucketInt(Table table, String 
column) {
+    private void checkPushDownByPartitionForBucketInt(Table table, String 
column) {
         // (BucketUtil.hash(15) & Integer.MAX_VALUE) % 2 = 0
         Integer i1 = 15;
 
@@ -212,12 +239,12 @@ public class IcebergTransactionTest {
         checkPushDownByPartition(table, greaterThan2, 2);
     }
 
-    public void checkPushDownByPartitionForString(Table table, String column) {
+    private void checkPushDownByPartitionForString(Table table, String column) 
{
         // Since the string used to create the partition is in date format, 
the date check can be reused directly
         checkPushDownByPartitionForDt(table, column);
     }
 
-    public void checkPushDownByPartitionForTs(Table table, String column) {
+    private void checkPushDownByPartitionForTs(Table table, String column) {
         String lessTs = "2023-12-11T12:34:56.123456";
         String eqTs = "2024-12-11T12:34:56.123456";
         String greaterTs = "2025-12-11T12:34:56.123456";
@@ -230,7 +257,7 @@ public class IcebergTransactionTest {
         checkPushDownByPartition(table, greaterThan, 0);
     }
 
-    public void checkPushDownByPartitionForDt(Table table, String column) {
+    private void checkPushDownByPartitionForDt(Table table, String column) {
         String less = "2023-12-11";
         String eq = "2024-12-11";
         String greater = "2025-12-11";
@@ -243,7 +270,7 @@ public class IcebergTransactionTest {
         checkPushDownByPartition(table, greaterThan, 0);
     }
 
-    public void checkPushDownByPartition(Table table, Expression expr, Integer 
expectFiles) {
+    private void checkPushDownByPartition(Table table, Expression expr, 
Integer expectFiles) {
         CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().filter(expr).planFiles();
         AtomicReference<Integer> cnt = new AtomicReference<>(0);
         fileScanTasks.forEach(notUse -> cnt.updateAndGet(v -> v + 1));
@@ -268,45 +295,64 @@ public class IcebergTransactionTest {
         ctdList.add(ctd1);
         ctdList.add(ctd2);
 
+        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithoutPartition));
+        new MockUp<IcebergUtils>() {
+            @Mock
+            public Table getRemoteTable(ExternalCatalog catalog, 
SimpleTableInfo tableInfo) {
+                return table;
+            }
+        };
+
         IcebergTransaction txn = getTxn();
         txn.updateIcebergCommitData(ctdList);
-        txn.beginInsert(dbName, tbWithoutPartition);
-        txn.finishInsert();
+        SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, 
tbWithPartition);
+        txn.beginInsert(tableInfo);
+        txn.finishInsert(tableInfo, Optional.empty());
         txn.commit();
 
-        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithoutPartition));
         checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", 
"6");
     }
 
-    public void checkSnapshotProperties(Map<String, String> props,
-                                        String addRecords,
-                                        String addFileCnt,
-                                        String addFileSize) {
+    private IcebergTransaction getTxn() {
+        return new IcebergTransaction(ops);
+    }
+
+    private void checkSnapshotProperties(Map<String, String> props,
+            String addRecords,
+            String addFileCnt,
+            String addFileSize) {
         Assert.assertEquals(addRecords, props.get("added-records"));
         Assert.assertEquals(addFileCnt, props.get("added-data-files"));
         Assert.assertEquals(addFileSize, props.get("added-files-size"));
     }
 
-    public String numToYear(Integer num) {
+    private String numToYear(Integer num) {
         Transform<Object, Integer> year = Transforms.year();
         return year.toHumanString(Types.IntegerType.get(), num);
     }
 
-    public String numToMonth(Integer num) {
+    private String numToMonth(Integer num) {
         Transform<Object, Integer> month = Transforms.month();
         return month.toHumanString(Types.IntegerType.get(), num);
     }
 
-    public String numToDay(Integer num) {
+    private String numToDay(Integer num) {
         Transform<Object, Integer> day = Transforms.day();
         return day.toHumanString(Types.IntegerType.get(), num);
     }
 
-    public String numToHour(Integer num) {
+    private String numToHour(Integer num) {
         Transform<Object, Integer> hour = Transforms.hour();
         return hour.toHumanString(Types.IntegerType.get(), num);
     }
 
+    @Test
+    public void tableCloneTest() {
+        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithoutPartition));
+        Table cloneTable = (Table) SerializationUtils.clone((Serializable) 
table);
+        Assert.assertNotNull(cloneTable);
+    }
+
     @Test
     public void testTransform() {
         Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z");
@@ -322,7 +368,4 @@ public class IcebergTransactionTest {
         Assert.assertEquals("2024-12-11", numToDay(dt));
     }
 
-    public IcebergTransaction getTxn() {
-        return new IcebergTransaction(ops);
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to