This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8930df3b31d [Feature](iceberg-writer) Implements iceberg partition transform. (#37692) 8930df3b31d is described below commit 8930df3b31d25a4b2c85e3661c1afe520e74c7df Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Sat Jul 13 16:07:50 2024 +0800 [Feature](iceberg-writer) Implements iceberg partition transform. (#37692) ## Proposed changes Cherry-pick iceberg partition transform functionality. #36289 #36889 --------- Co-authored-by: kang <35803862+ghkan...@users.noreply.github.com> Co-authored-by: lik40 <li...@chinatelecom.cn> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Mingyu Chen <morning...@163.com> --- be/src/util/bit_util.h | 22 + .../sink/writer/iceberg/partition_transformers.cpp | 172 ++- .../sink/writer/iceberg/partition_transformers.h | 1274 +++++++++++++++++++- .../sink/writer/iceberg/viceberg_table_writer.cpp | 22 +- .../writer/iceberg/partition_transformers_test.cpp | 489 ++++++++ .../apache/doris/common/info/SimpleTableInfo.java | 66 + .../datasource/iceberg/IcebergMetadataCache.java | 19 +- .../datasource/iceberg/IcebergMetadataOps.java | 4 + .../datasource/iceberg/IcebergTransaction.java | 211 ++-- .../doris/datasource/iceberg/IcebergUtils.java | 64 +- .../iceberg/helper/IcebergWriterHelper.java | 91 ++ .../iceberg/source/IcebergApiSource.java | 2 +- .../iceberg/source/IcebergHMSSource.java | 4 +- .../datasource/statistics/CommonStatistics.java | 81 ++ .../commands/insert/IcebergInsertExecutor.java | 28 +- .../org/apache/doris/planner/IcebergTableSink.java | 2 +- .../transaction/IcebergTransactionManager.java | 7 +- .../datasource/iceberg/IcebergTransactionTest.java | 139 ++- 18 files changed, 2468 insertions(+), 229 deletions(-) diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h index 230134ade09..6934f45ef3e 100644 --- a/be/src/util/bit_util.h +++ b/be/src/util/bit_util.h @@ -98,6 +98,28 @@ public: return (v << n) >> n; } + template <typename T> + static std::string IntToByteBuffer(T input) { + std::string buffer; + T value = input; + for (int i = 0; i < sizeof(value); ++i) { + // Applies a mask for a byte range on the input. + char value_to_save = value & 0XFF; + buffer.push_back(value_to_save); + // Remove the just processed part from the input so that we can exit early if there + // is nothing left to process. + value >>= 8; + if (value == 0 && value_to_save >= 0) { + break; + } + if (value == -1 && value_to_save < 0) { + break; + } + } + std::reverse(buffer.begin(), buffer.end()); + return buffer; + } + // Returns ceil(log2(x)). // TODO: this could be faster if we use __builtin_clz. Fix this if this ever shows up // in a hot path. diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp index 0faebea6295..ee8268d30f7 100644 --- a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp +++ b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp @@ -25,31 +25,109 @@ namespace doris { namespace vectorized { +const std::chrono::sys_days PartitionColumnTransformUtils::EPOCH = std::chrono::sys_days( + std::chrono::year {1970} / std::chrono::January / std::chrono::day {1}); + std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create( const doris::iceberg::PartitionField& field, const TypeDescriptor& source_type) { auto& transform = field.transform(); - static const std::regex hasWidth(R"((\w+)\[(\d+)\])"); + static const std::regex has_width(R"((\w+)\[(\d+)\])"); std::smatch width_match; - if (std::regex_match(transform, width_match, hasWidth)) { + if (std::regex_match(transform, width_match, has_width)) { std::string name = width_match[1]; - //int parsed_width = std::stoi(width_match[2]); + int parsed_width = std::stoi(width_match[2]); if (name == "truncate") { switch (source_type.type) { + case TYPE_INT: { + return std::make_unique<IntegerTruncatePartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_BIGINT: { + return std::make_unique<BigintTruncatePartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + return std::make_unique<StringTruncatePartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_DECIMALV2: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V2>>( + source_type, parsed_width); + } + case TYPE_DECIMAL32: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal32>>( + source_type, parsed_width); + } + case TYPE_DECIMAL64: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal64>>( + source_type, parsed_width); + } + case TYPE_DECIMAL128I: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V3>>( + source_type, parsed_width); + } + case TYPE_DECIMAL256: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal256>>( + source_type, parsed_width); + } default: { - throw doris::Exception( - doris::ErrorCode::INTERNAL_ERROR, - "Unsupported type for truncate partition column transform {}", - source_type.debug_string()); + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); } } } else if (name == "bucket") { switch (source_type.type) { + case TYPE_INT: { + return std::make_unique<IntBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_BIGINT: { + return std::make_unique<BigintBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + return std::make_unique<StringBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_DATEV2: { + return std::make_unique<DateBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_DECIMALV2: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V2>>( + source_type, parsed_width); + } + case TYPE_DECIMAL32: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal32>>( + source_type, parsed_width); + } + case TYPE_DECIMAL64: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal64>>( + source_type, parsed_width); + } + case TYPE_DECIMAL128I: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V3>>( + source_type, parsed_width); + } + case TYPE_DECIMAL256: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal256>>( + source_type, parsed_width); + } default: { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, - "Unsupported type for bucket partition column transform {}", - source_type.debug_string()); + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); } } } @@ -57,14 +135,79 @@ std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create( if (transform == "identity") { return std::make_unique<IdentityPartitionColumnTransform>(source_type); + } else if (transform == "year") { + switch (source_type.type) { + case TYPE_DATEV2: { + return std::make_unique<DateYearPartitionColumnTransform>(source_type); + } + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampYearPartitionColumnTransform>(source_type); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); + } + } + } else if (transform == "month") { + switch (source_type.type) { + case TYPE_DATEV2: { + return std::make_unique<DateMonthPartitionColumnTransform>(source_type); + } + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampMonthPartitionColumnTransform>(source_type); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); + } + } + } else if (transform == "day") { + switch (source_type.type) { + case TYPE_DATEV2: { + return std::make_unique<DateDayPartitionColumnTransform>(source_type); + } + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampDayPartitionColumnTransform>(source_type); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); + } + } + } else if (transform == "hour") { + switch (source_type.type) { + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampHourPartitionColumnTransform>(source_type); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); + } + } + } else if (transform == "void") { + return std::make_unique<VoidPartitionColumnTransform>(source_type); } else { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, - "Unsupported partition column transform: {}.", transform); + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); } } +std::string PartitionColumnTransform::name() const { + return "default"; +} + std::string PartitionColumnTransform::to_human_string(const TypeDescriptor& type, const std::any& value) const { + return get_partition_value(type, value); +} + +std::string PartitionColumnTransform::get_partition_value(const TypeDescriptor& type, + const std::any& value) const { if (value.has_value()) { switch (type.type) { case TYPE_BOOLEAN: { @@ -131,19 +274,12 @@ std::string PartitionColumnTransform::to_human_string(const TypeDescriptor& type } default: { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, - "Unsupported partition column transform: {}", - type.debug_string()); + "Unsupported type {} for partition", type.debug_string()); } } } return "null"; } -ColumnWithTypeAndName IdentityPartitionColumnTransform::apply(Block& block, int idx) { - const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(idx); - return {column_with_type_and_name.column, column_with_type_and_name.type, - column_with_type_and_name.name}; -} - } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.h b/be/src/vec/sink/writer/iceberg/partition_transformers.h index 13c6238d1db..304c5834844 100644 --- a/be/src/vec/sink/writer/iceberg/partition_transformers.h +++ b/be/src/vec/sink/writer/iceberg/partition_transformers.h @@ -45,25 +45,77 @@ public: const doris::iceberg::PartitionField& field, const TypeDescriptor& source_type); }; +class PartitionColumnTransformUtils { +public: + static DateV2Value<DateV2ValueType>& epoch_date() { + static DateV2Value<DateV2ValueType> epoch_date; + static bool initialized = false; + if (!initialized) { + epoch_date.from_date_str("1970-01-01 00:00:00", 19); + initialized = true; + } + return epoch_date; + } + + static DateV2Value<DateTimeV2ValueType>& epoch_datetime() { + static DateV2Value<DateTimeV2ValueType> epoch_datetime; + static bool initialized = false; + if (!initialized) { + epoch_datetime.from_date_str("1970-01-01 00:00:00", 19); + initialized = true; + } + return epoch_datetime; + } + + static std::string human_year(int year_ordinal) { + auto ymd = std::chrono::year_month_day {EPOCH} + std::chrono::years(year_ordinal); + return std::to_string(static_cast<int>(ymd.year())); + } + + static std::string human_month(int month_ordinal) { + auto ymd = std::chrono::year_month_day {EPOCH} + std::chrono::months(month_ordinal); + return fmt::format("{:04d}-{:02d}", static_cast<int>(ymd.year()), + static_cast<unsigned>(ymd.month())); + } + + static std::string human_day(int day_ordinal) { + auto ymd = std::chrono::year_month_day(std::chrono::sys_days( + std::chrono::floor<std::chrono::days>(EPOCH + std::chrono::days(day_ordinal)))); + return fmt::format("{:04d}-{:02d}-{:02d}", static_cast<int>(ymd.year()), + static_cast<unsigned>(ymd.month()), static_cast<unsigned>(ymd.day())); + } + + static std::string human_hour(int hour_ordinal) { + int day_value = hour_ordinal / 24; + int housr_value = hour_ordinal % 24; + auto ymd = std::chrono::year_month_day(std::chrono::sys_days( + std::chrono::floor<std::chrono::days>(EPOCH + std::chrono::days(day_value)))); + return fmt::format("{:04d}-{:02d}-{:02d}-{:02d}", static_cast<int>(ymd.year()), + static_cast<unsigned>(ymd.month()), static_cast<unsigned>(ymd.day()), + housr_value); + } + +private: + static const std::chrono::sys_days EPOCH; + PartitionColumnTransformUtils() = default; +}; + class PartitionColumnTransform { public: PartitionColumnTransform() = default; virtual ~PartitionColumnTransform() = default; - virtual bool preserves_non_null() const { return false; } - - virtual bool monotonic() const { return true; } - - virtual bool temporal() const { return false; } + virtual std::string name() const; virtual const TypeDescriptor& get_result_type() const = 0; - virtual bool is_void() const { return false; } - - virtual ColumnWithTypeAndName apply(Block& block, int idx) = 0; + virtual ColumnWithTypeAndName apply(const Block& block, int column_pos) = 0; virtual std::string to_human_string(const TypeDescriptor& type, const std::any& value) const; + + virtual std::string get_partition_value(const TypeDescriptor& type, + const std::any& value) const; }; class IdentityPartitionColumnTransform : public PartitionColumnTransform { @@ -71,12 +123,1214 @@ public: IdentityPartitionColumnTransform(const TypeDescriptor& source_type) : _source_type(source_type) {} - virtual const TypeDescriptor& get_result_type() const { return _source_type; } + std::string name() const override { return "Identity"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + return {column_with_type_and_name.column, column_with_type_and_name.type, + column_with_type_and_name.name}; + } + +private: + TypeDescriptor _source_type; +}; + +class StringTruncatePartitionColumnTransform : public PartitionColumnTransform { +public: + StringTruncatePartitionColumnTransform(const TypeDescriptor& source_type, int width) + : _source_type(source_type), _width(width) {} + + std::string name() const override { return "StringTruncate"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + static_cast<void>(_width); + auto int_type = std::make_shared<DataTypeInt32>(); + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + + ColumnPtr string_column_ptr; + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (auto* nullable_column = + check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) { + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + string_column_ptr = nullable_column->get_nested_column_ptr(); + is_nullable = true; + } else { + string_column_ptr = column_with_type_and_name.column; + is_nullable = false; + } + + // Create a temp_block to execute substring function. + Block temp_block; + temp_block.insert(column_with_type_and_name); + temp_block.insert({int_type->create_column_const(temp_block.rows(), to_field(1)), int_type, + "const 1"}); + temp_block.insert({int_type->create_column_const(temp_block.rows(), to_field(_width)), + int_type, fmt::format("const {}", _width)}); + temp_block.insert({nullptr, std::make_shared<DataTypeString>(), "result"}); + ColumnNumbers temp_arguments(3); + temp_arguments[0] = 0; // str column + temp_arguments[1] = 1; // pos + temp_arguments[2] = 2; // width + size_t result_column_id = 3; + + SubstringUtil::substring_execute(temp_block, temp_arguments, result_column_id, + temp_block.rows()); + if (is_nullable) { + auto res_column = ColumnNullable::create( + temp_block.get_by_position(result_column_id).column, null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + auto res_column = temp_block.get_by_position(result_column_id).column; + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _width; +}; + +class IntegerTruncatePartitionColumnTransform : public PartitionColumnTransform { +public: + IntegerTruncatePartitionColumnTransform(const TypeDescriptor& source_type, int width) + : _source_type(source_type), _width(width) {} + + std::string name() const override { return "IntegerTruncate"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnInt32*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const int* end_in = in_data.data() + in_data.size(); + const Int32* __restrict p_in = in_data.data(); + Int32* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + *p_out = *p_in - ((*p_in % _width) + _width) % _width; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {res_column, + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _width; +}; + +class BigintTruncatePartitionColumnTransform : public PartitionColumnTransform { +public: + BigintTruncatePartitionColumnTransform(const TypeDescriptor& source_type, int width) + : _source_type(source_type), _width(width) {} + + std::string name() const override { return "BigintTruncate"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnInt64*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt64::create(); + ColumnInt64::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const Int64* end_in = in_data.data() + in_data.size(); + const Int64* __restrict p_in = in_data.data(); + Int64* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + *p_out = *p_in - ((*p_in % _width) + _width) % _width; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {res_column, + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _width; +}; + +template <typename T> +class DecimalTruncatePartitionColumnTransform : public PartitionColumnTransform { +public: + DecimalTruncatePartitionColumnTransform(const TypeDescriptor& source_type, int width) + : _source_type(source_type), _width(width) {} + + std::string name() const override { return "DecimalTruncate"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + + ColumnPtr column_ptr; + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (auto* nullable_column = + check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) { + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + is_nullable = true; + } else { + column_ptr = column_with_type_and_name.column; + is_nullable = false; + } + + const auto* const decimal_col = check_and_get_column<ColumnDecimal<T>>(column_ptr); + const auto& vec_src = decimal_col->get_data(); + + auto col_res = ColumnDecimal<T>::create(vec_src.size(), decimal_col->get_scale()); + auto& vec_res = col_res->get_data(); + + const typename T::NativeType* __restrict p_in = + reinterpret_cast<const T::NativeType*>(vec_src.data()); + const typename T::NativeType* end_in = + reinterpret_cast<const T::NativeType*>(vec_src.data()) + vec_src.size(); + typename T::NativeType* __restrict p_out = reinterpret_cast<T::NativeType*>(vec_res.data()); + + while (p_in < end_in) { + typename T::NativeType remainder = ((*p_in % _width) + _width) % _width; + *p_out = *p_in - remainder; + ++p_in; + ++p_out; + } + + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {res_column, + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _width; +}; + +class IntBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + IntBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "IntBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnInt32*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const int* end_in = in_data.data() + in_data.size(); + const Int32* __restrict p_in = in_data.data(); + Int32* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + Int64 long_value = static_cast<Int64>(*p_in); + uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0); + // *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num; + *p_out = (hash_value & INT32_MAX) % _bucket_num; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class BigintBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + BigintBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "BigintBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnInt64*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const Int64* end_in = in_data.data() + in_data.size(); + const Int64* __restrict p_in = in_data.data(); + Int32* __restrict p_out = out_data.data(); + while (p_in < end_in) { + Int64 long_value = static_cast<Int64>(*p_in); + uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0); + // int value = ((hash_value >> 1) & INT32_MAX) % _bucket_num; + int value = (hash_value & INT32_MAX) % _bucket_num; + *p_out = value; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +template <typename T> +class DecimalBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + DecimalBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "DecimalBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDecimal<T>*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + auto& vec_res = col_res->get_data(); + + const typename T::NativeType* __restrict p_in = + reinterpret_cast<const T::NativeType*>(in_data.data()); + const typename T::NativeType* end_in = + reinterpret_cast<const T::NativeType*>(in_data.data()) + in_data.size(); + typename T::NativeType* __restrict p_out = reinterpret_cast<T::NativeType*>(vec_res.data()); + + while (p_in < end_in) { + std::string buffer = BitUtil::IntToByteBuffer(*p_in); + uint32_t hash_value = HashUtil::murmur_hash3_32(buffer.data(), buffer.size(), 0); + *p_out = (hash_value & INT32_MAX) % _bucket_num; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + return get_partition_value(type, value); + } + + std::string get_partition_value(const TypeDescriptor& type, + const std::any& value) const override { + if (value.has_value()) { + return std::to_string(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class DateBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + DateBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "DateBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const auto* end_in = in_data.data() + in_data.size(); + + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateV2ValueType> value = + binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in); + + int32_t days_from_unix_epoch = value.daynr() - 719528; + Int64 long_value = static_cast<Int64>(days_from_unix_epoch); + uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0); + + *p_out = (hash_value & INT32_MAX) % _bucket_num; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class TimestampBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const auto* end_in = in_data.data() + in_data.size(); + + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + + int64_t timestamp; + if (!value.unix_timestamp(×tamp, "UTC")) { + LOG(WARNING) << "Failed to call unix_timestamp :" << value.debug_string(); + timestamp = 0; + } + Int64 long_value = static_cast<Int64>(timestamp) * 1000000; + uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0); + + *p_out = (hash_value & INT32_MAX) % _bucket_num; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return std::to_string(std::any_cast<Int32>(value)); + ; + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class StringBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + StringBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "StringBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto* str_col = assert_cast<const ColumnString*>(column_ptr.get()); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + const auto& data = str_col->get_chars(); + const auto& offsets = str_col->get_offsets(); + + size_t offset_size = offsets.size(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(offset_size); + auto* __restrict p_out = out_data.data(); + + for (int i = 0; i < offset_size; i++) { + const unsigned char* raw_str = &data[offsets[i - 1]]; + ColumnString::Offset size = offsets[i] - offsets[i - 1]; + uint32_t hash_value = HashUtil::murmur_hash3_32(raw_str, size, 0); + + *p_out = (hash_value & INT32_MAX) % _bucket_num; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class DateYearPartitionColumnTransform : public PartitionColumnTransform { +public: + DateYearPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "DateYear"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateV2ValueType> value = + binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in); + *p_out = datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class TimestampYearPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampYearPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampYear"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + *p_out = datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class DateMonthPartitionColumnTransform : public PartitionColumnTransform { +public: + DateMonthPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "DateMonth"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateV2ValueType> value = + binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in); + *p_out = datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class TimestampMonthPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampMonthPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampMonth"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + *p_out = datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class DateDayPartitionColumnTransform : public PartitionColumnTransform { +public: + DateDayPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "DateDay"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateV2ValueType> value = + binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in); + *p_out = datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + return get_partition_value(type, value); + } + + std::string get_partition_value(const TypeDescriptor& type, + const std::any& value) const override { + if (value.has_value()) { + int day_value = std::any_cast<Int32>(value); + return PartitionColumnTransformUtils::human_day(day_value); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class TimestampDayPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampDayPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampDay"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + *p_out = datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + return get_partition_value(type, value); + } + + std::string get_partition_value(const TypeDescriptor& type, + const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_day(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class TimestampHourPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampHourPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampHour"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + *p_out = datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_hour(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class VoidPartitionColumnTransform : public PartitionColumnTransform { +public: + VoidPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(source_type) {} + + std::string name() const override { return "Void"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(const Block& block, int column_pos) override { + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); - virtual ColumnWithTypeAndName apply(Block& block, int idx); + ColumnPtr column_ptr; + ColumnPtr null_map_column_ptr; + if (auto* nullable_column = + check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) { + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } else { + column_ptr = column_with_type_and_name.column; + } + auto res_column = ColumnNullable::create(std::move(column_ptr), + ColumnUInt8::create(column_ptr->size(), 1)); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } private: TypeDescriptor _source_type; + TypeDescriptor _target_type; }; } // namespace vectorized diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index 2703330406c..e59b0593f7b 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -329,8 +329,8 @@ std::vector<std::string> VIcebergTableWriter::_partition_values( TypeDescriptor result_type = iceberg_partition_column.partition_column_transform().get_result_type(); partition_values.emplace_back( - iceberg_partition_column.partition_column_transform().to_human_string(result_type, - data.get(i))); + iceberg_partition_column.partition_column_transform().get_partition_value( + result_type, data.get(i))); } return partition_values; @@ -407,21 +407,25 @@ std::optional<PartitionData> VIcebergTableWriter::_get_partition_data( std::any VIcebergTableWriter::_get_iceberg_partition_value( const TypeDescriptor& type_desc, const ColumnWithTypeAndName& partition_column, int position) { - ColumnPtr column; - if (auto* nullable_column = check_and_get_column<ColumnNullable>(*partition_column.column)) { + //1) get the partition column ptr + ColumnPtr col_ptr = partition_column.column->convert_to_full_column_if_const(); + CHECK(col_ptr != nullptr); + if (col_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(col_ptr.get()); auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); if (null_map_data[position]) { return std::any(); } - column = nullable_column->get_nested_column_ptr(); - } else { - column = partition_column.column; + col_ptr = nullable_column->get_nested_column_ptr(); } - auto [item, size] = column->get_data_at(position); + + //2) get parition field data from paritionblock + auto [item, size] = col_ptr->get_data_at(position); switch (type_desc.type) { case TYPE_BOOLEAN: { vectorized::Field field = - vectorized::check_and_get_column<const ColumnUInt8>(*column)->operator[](position); + vectorized::check_and_get_column<const ColumnUInt8>(*col_ptr)->operator[](position); return field.get<bool>(); } case TYPE_TINYINT: { diff --git a/be/test/vec/sink/writer/iceberg/partition_transformers_test.cpp b/be/test/vec/sink/writer/iceberg/partition_transformers_test.cpp new file mode 100644 index 00000000000..a8df4f60d83 --- /dev/null +++ b/be/test/vec/sink/writer/iceberg/partition_transformers_test.cpp @@ -0,0 +1,489 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/iceberg/partition_transformers.h" + +#include <gtest/gtest.h> + +#include "vec/data_types/data_type_time_v2.h" + +namespace doris::vectorized { + +class PartitionTransformersTest : public testing::Test { +public: + PartitionTransformersTest() = default; + virtual ~PartitionTransformersTest() = default; +}; + +TEST_F(PartitionTransformersTest, test_integer_truncate_transform) { + const std::vector<int32_t> values({1, -1}); + auto column = ColumnInt32::create(); + column->insert_many_fix_len_data(reinterpret_cast<const char*>(values.data()), values.size()); + ColumnWithTypeAndName test_int(column->get_ptr(), std::make_shared<DataTypeInt32>(), + "test_int"); + + Block block({test_int}); + TypeDescriptor source_type(PrimitiveType::TYPE_INT); + IntegerTruncatePartitionColumnTransform transform(source_type, 10); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {0, -10}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + } +} + +TEST_F(PartitionTransformersTest, test_bigint_truncate_transform) { + const std::vector<int64_t> values({1, -1}); + auto column = ColumnInt64::create(); + column->insert_many_fix_len_data(reinterpret_cast<const char*>(values.data()), values.size()); + ColumnWithTypeAndName test_bigint(column->get_ptr(), std::make_shared<DataTypeInt64>(), + "test_bigint"); + + Block block({test_bigint}); + TypeDescriptor source_type(PrimitiveType::TYPE_BIGINT); + BigintTruncatePartitionColumnTransform transform(source_type, 10); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt64*>(result.column.get())->get_data(); + std::vector<int64_t> expected_data = {0, -10}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + } +} + +TEST_F(PartitionTransformersTest, test_decimal32_truncate_transform) { + const std::vector<int32_t> values({1065}); + auto column = ColumnDecimal32::create(0, 2); + column->insert_many_fix_len_data(reinterpret_cast<const char*>(values.data()), values.size()); + ColumnWithTypeAndName test_decimal32(column->get_ptr(), + std::make_shared<DataTypeDecimal<Decimal32>>(4, 2), + "test_decimal32"); + + Block block({test_decimal32}); + TypeDescriptor source_type = TypeDescriptor::create_decimalv3_type(4, 2); + DecimalTruncatePartitionColumnTransform<Decimal32> transform(source_type, 50); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnDecimal32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {1050}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i].value); + } +} + +TEST_F(PartitionTransformersTest, test_string_truncate_transform) { + const std::vector<StringRef> values({{"iceberg", sizeof("iceberg") - 1}}); + auto column = ColumnString::create(); + column->insert_many_strings(&values[0], values.size()); + ColumnWithTypeAndName test_string(column->get_ptr(), std::make_shared<DataTypeString>(), + "test_string"); + + Block block({test_string}); + TypeDescriptor source_type = TypeDescriptor::create_string_type(); + StringTruncatePartitionColumnTransform transform(source_type, 3); + + auto result = transform.apply(block, 0); + const auto result_column = assert_cast<const ColumnString*>(result.column.get()); + const char result_data[] = {'i', 'c', 'e'}; + std::vector<StringRef> expected_data = { + {result_data, sizeof(result_data) / sizeof(result_data[0])}}; + EXPECT_EQ(expected_data.size(), result_column->size()); + for (size_t i = 0; i < result_column->size(); ++i) { + EXPECT_EQ(expected_data[i], result_column->get_data_at(i)); + } +} + +TEST_F(PartitionTransformersTest, test_integer_bucket_transform) { + const std::vector<int32_t> values({34, -123}); // 2017239379, -471378254 + auto column = ColumnInt32::create(); + column->insert_many_fix_len_data(reinterpret_cast<const char*>(values.data()), values.size()); + ColumnWithTypeAndName test_int(column->get_ptr(), std::make_shared<DataTypeInt32>(), + "test_int"); + + Block block({test_int}); + TypeDescriptor source_type(PrimitiveType::TYPE_INT); + IntBucketPartitionColumnTransform transform(source_type, 16); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {3, 2}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + } +} + +TEST_F(PartitionTransformersTest, test_bigint_bucket_transform) { + const std::vector<int64_t> values({34, -123}); // 2017239379, -471378254 + auto column = ColumnInt64::create(); + column->insert_many_fix_len_data(reinterpret_cast<const char*>(values.data()), values.size()); + ColumnWithTypeAndName test_bigint(column->get_ptr(), std::make_shared<DataTypeInt64>(), + "test_bigint"); + + Block block({test_bigint}); + TypeDescriptor source_type(PrimitiveType::TYPE_BIGINT); + BigintBucketPartitionColumnTransform transform(source_type, 16); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {3, 2}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + } +} + +TEST_F(PartitionTransformersTest, test_decimal32_bucket_transform) { + const std::vector<int32_t> values({1420}); // -500754589 + auto column = ColumnDecimal32::create(0, 2); + column->insert_many_fix_len_data(reinterpret_cast<const char*>(values.data()), values.size()); + ColumnWithTypeAndName test_decimal32(column->get_ptr(), + std::make_shared<DataTypeDecimal<Decimal32>>(4, 2), + "test_decimal32"); + + Block block({test_decimal32}); + TypeDescriptor source_type = TypeDescriptor::create_decimalv3_type(4, 2); + DecimalBucketPartitionColumnTransform<Decimal32> transform(source_type, 16); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {3}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + } +} + +TEST_F(PartitionTransformersTest, test_date_bucket_transform) { + auto column = ColumnDateV2::create(); + auto& date_v2_data = column->get_data(); + DateV2Value<DateV2ValueType> value; + value.set_time(2017, 11, 16, 0, 0, 0, 0); // -653330422 + date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value)); + ColumnWithTypeAndName test_date(column->get_ptr(), std::make_shared<DataTypeDateV2>(), + "test_date"); + + Block block({test_date}); + TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2); + DateBucketPartitionColumnTransform transform(source_type, 16); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {10}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + } +} + +TEST_F(PartitionTransformersTest, test_timestamp_bucket_transform) { + auto column = ColumnDateTimeV2::create(); + auto& datetime_v2_data = column->get_data(); + DateV2Value<DateTimeV2ValueType> value; + value.set_time(2017, 11, 16, 22, 31, 8, 0); // -2047944441 + datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value)); + ColumnWithTypeAndName test_timestamp(column->get_ptr(), std::make_shared<DataTypeDateTimeV2>(), + "test_timestamp"); + + Block block({test_timestamp}); + TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2); + TimestampBucketPartitionColumnTransform transform(source_type, 16); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {7}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + } +} + +TEST_F(PartitionTransformersTest, test_string_bucket_transform) { + const std::vector<StringRef> values({{"iceberg", sizeof("iceberg") - 1}}); // 1210000089 + auto column = ColumnString::create(); + column->insert_many_strings(&values[0], values.size()); + ColumnWithTypeAndName test_string(column->get_ptr(), std::make_shared<DataTypeString>(), + "test_string"); + + Block block({test_string}); + TypeDescriptor source_type(PrimitiveType::TYPE_STRING); + StringBucketPartitionColumnTransform transform(source_type, 16); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {9}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + } +} + +TEST_F(PartitionTransformersTest, test_date_year_transform) { + auto column = ColumnDateV2::create(); + auto& date_v2_data = column->get_data(); + DateV2Value<DateV2ValueType> value; + value.set_time(2017, 11, 16, 0, 0, 0, 0); + date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value)); + ColumnWithTypeAndName test_date(column->get_ptr(), std::make_shared<DataTypeDateV2>(), + "test_date"); + + Block block({test_date}); + TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2); + DateYearPartitionColumnTransform transform(source_type); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {47}; + std::vector<std::string> expected_human_string = {"2017"}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + EXPECT_EQ(expected_human_string[i], + transform.to_human_string(transform.get_result_type(), result_data[i])); + } +} + +TEST_F(PartitionTransformersTest, test_timestamp_year_transform) { + auto column = ColumnDateTimeV2::create(); + auto& datetime_v2_data = column->get_data(); + DateV2Value<DateTimeV2ValueType> value; + value.set_time(2017, 11, 16, 22, 31, 8, 0); + datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value)); + ColumnWithTypeAndName test_timestamp(column->get_ptr(), std::make_shared<DataTypeDateTimeV2>(), + "test_timestamp"); + + Block block({test_timestamp}); + TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2); + TimestampYearPartitionColumnTransform transform(source_type); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {47}; + std::vector<std::string> expected_human_string = {"2017"}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + EXPECT_EQ(expected_human_string[i], + transform.to_human_string(transform.get_result_type(), result_data[i])); + } +} + +TEST_F(PartitionTransformersTest, test_date_month_transform) { + auto column = ColumnDateV2::create(); + auto& date_v2_data = column->get_data(); + DateV2Value<DateV2ValueType> value; + value.set_time(2017, 11, 16, 0, 0, 0, 0); + date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value)); + ColumnWithTypeAndName test_date(column->get_ptr(), std::make_shared<DataTypeDateV2>(), + "test_date"); + + Block block({test_date}); + TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2); + DateMonthPartitionColumnTransform transform(source_type); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {574}; + std::vector<std::string> expected_human_string = {"2017-11"}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + EXPECT_EQ(expected_human_string[i], + transform.to_human_string(transform.get_result_type(), result_data[i])); + } +} + +TEST_F(PartitionTransformersTest, test_timestamp_month_transform) { + auto column = ColumnDateTimeV2::create(); + auto& datetime_v2_data = column->get_data(); + DateV2Value<DateTimeV2ValueType> value; + value.set_time(2017, 11, 16, 22, 31, 8, 0); + datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value)); + ColumnWithTypeAndName test_timestamp(column->get_ptr(), std::make_shared<DataTypeDateTimeV2>(), + "test_timestamp"); + + Block block({test_timestamp}); + TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2); + TimestampMonthPartitionColumnTransform transform(source_type); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {574}; + std::vector<std::string> expected_human_string = {"2017-11"}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + EXPECT_EQ(expected_human_string[i], + transform.to_human_string(transform.get_result_type(), result_data[i])); + } +} + +TEST_F(PartitionTransformersTest, test_date_day_transform) { + auto column = ColumnDateV2::create(); + auto& date_v2_data = column->get_data(); + DateV2Value<DateV2ValueType> value; + value.set_time(2017, 11, 16, 0, 0, 0, 0); + date_v2_data.push_back(*reinterpret_cast<vectorized::UInt32*>(&value)); + ColumnWithTypeAndName test_date(column->get_ptr(), std::make_shared<DataTypeDateV2>(), + "test_date"); + + Block block({test_date}); + TypeDescriptor source_type(PrimitiveType::TYPE_DATEV2); + DateDayPartitionColumnTransform transform(source_type); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {17486}; + std::vector<std::string> expected_human_string = {"2017-11-16"}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + EXPECT_EQ(expected_human_string[i], + transform.to_human_string(transform.get_result_type(), result_data[i])); + } +} + +TEST_F(PartitionTransformersTest, test_timestamp_day_transform) { + auto column = ColumnDateTimeV2::create(); + auto& datetime_v2_data = column->get_data(); + DateV2Value<DateTimeV2ValueType> value; + value.set_time(2017, 11, 16, 22, 31, 8, 0); + datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value)); + ColumnWithTypeAndName test_timestamp(column->get_ptr(), std::make_shared<DataTypeDateTimeV2>(), + "test_timestamp"); + + Block block({test_timestamp}); + TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2); + TimestampDayPartitionColumnTransform transform(source_type); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {17486}; + std::vector<std::string> expected_human_string = {"2017-11-16"}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + EXPECT_EQ(expected_human_string[i], + transform.to_human_string(transform.get_result_type(), result_data[i])); + } +} + +TEST_F(PartitionTransformersTest, test_timestamp_hour_transform) { + auto column = ColumnDateTimeV2::create(); + auto& datetime_v2_data = column->get_data(); + DateV2Value<DateTimeV2ValueType> value; + value.set_time(2017, 11, 16, 22, 31, 8, 0); + datetime_v2_data.push_back(*reinterpret_cast<vectorized::UInt64*>(&value)); + ColumnWithTypeAndName test_timestamp(column->get_ptr(), std::make_shared<DataTypeDateTimeV2>(), + "test_timestamp"); + + Block block({test_timestamp}); + TypeDescriptor source_type(PrimitiveType::TYPE_DATETIMEV2); + TimestampHourPartitionColumnTransform transform(source_type); + + auto result = transform.apply(block, 0); + + const auto& result_data = assert_cast<const ColumnInt32*>(result.column.get())->get_data(); + std::vector<int32_t> expected_data = {419686}; + std::vector<std::string> expected_human_string = {"2017-11-16-22"}; + EXPECT_EQ(expected_data.size(), result_data.size()); + for (size_t i = 0; i < result_data.size(); ++i) { + EXPECT_EQ(expected_data[i], result_data[i]); + EXPECT_EQ(expected_human_string[i], + transform.to_human_string(transform.get_result_type(), result_data[i])); + } +} + +TEST_F(PartitionTransformersTest, test_void_transform) { + const std::vector<int32_t> values({1, -1}); + auto column = ColumnInt32::create(); + column->insert_many_fix_len_data(reinterpret_cast<const char*>(values.data()), values.size()); + ColumnWithTypeAndName test_int(column->get_ptr(), std::make_shared<DataTypeInt32>(), + "test_int"); + + Block block({test_int}); + TypeDescriptor source_type(PrimitiveType::TYPE_INT); + VoidPartitionColumnTransform transform(source_type); + + auto result = transform.apply(block, 0); + + const auto& result_null_map_data = + assert_cast<const ColumnNullable*>(result.column.get())->get_null_map_data(); + + for (size_t i = 0; i < result_null_map_data.size(); ++i) { + EXPECT_EQ(1, result_null_map_data[i]); + } +} + +TEST_F(PartitionTransformersTest, test_nullable_column_integer_truncate_transform) { + const std::vector<int32_t> values({1, -1}); + auto column = ColumnNullable::create(ColumnInt32::create(), ColumnUInt8::create()); + column->insert_data(nullptr, 0); + column->insert_many_fix_len_data(reinterpret_cast<const char*>(values.data()), values.size()); + ColumnWithTypeAndName test_int( + column->get_ptr(), + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()), "test_int"); + + Block block({test_int}); + TypeDescriptor source_type(PrimitiveType::TYPE_INT); + IntegerTruncatePartitionColumnTransform transform(source_type, 10); + + auto result = transform.apply(block, 0); + + std::vector<int32_t> expected_data = {0, -10}; + std::vector<std::string> expected_human_string = {"0", "-10"}; + const auto* result_column = assert_cast<const ColumnNullable*>(result.column.get()); + const auto& result_data = + assert_cast<const ColumnInt32*>(result_column->get_nested_column_ptr().get()) + ->get_data(); + const auto& null_map_column = result_column->get_null_map_column(); + + EXPECT_EQ(1, null_map_column[0]); + EXPECT_EQ(0, null_map_column[1]); + EXPECT_EQ(0, null_map_column[2]); + + for (size_t i = 0, j = 0; i < result_column->size(); ++i) { + if (null_map_column[i] == 0) { + EXPECT_EQ(expected_data[j], result_data[i]); + EXPECT_EQ(expected_human_string[j], + transform.to_human_string(transform.get_result_type(), result_data[i])); + ++j; + } + } +} + +} // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java new file mode 100644 index 00000000000..6fdb27e1d0b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/AnalysisException.java +// and modified by Doris + +package org.apache.doris.common.info; + +import java.util.Objects; + +public class SimpleTableInfo { + + private final String dbName; + private final String tbName; + + public SimpleTableInfo(String dbName, String tbName) { + this.dbName = dbName; + this.tbName = tbName; + } + + public String getDbName() { + return dbName; + } + + public String getTbName() { + return tbName; + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tbName); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + SimpleTableInfo that = (SimpleTableInfo) other; + return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, that.tbName); + } + + @Override + public String toString() { + return String.format("%s.%s", dbName, tbName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index acda08b7378..68064c4e439 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -33,6 +33,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -85,6 +86,20 @@ public class IcebergMetadataCache { return tableCache.get(key); } + public Table getAndCloneTable(CatalogIf catalog, String dbName, String tbName) { + Table restTable; + synchronized (this) { + Table table = getIcebergTable(catalog, dbName, tbName); + restTable = SerializableTable.copyOf(table); + } + return restTable; + } + + public Table getRemoteTable(CatalogIf catalog, String dbName, String tbName) { + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName); + return loadTable(key); + } + @NotNull private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) { Table icebergTable = getIcebergTable(key.catalog, key.dbName, key.tableName); @@ -116,7 +131,7 @@ public class IcebergMetadataCache { public void invalidateCatalogCache(long catalogId) { snapshotListCache.asMap().keySet().stream() .filter(key -> key.catalog.getId() == catalogId) - .forEach(snapshotListCache::invalidate); + .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() .filter(entry -> entry.getKey().catalog.getId() == catalogId) @@ -130,7 +145,7 @@ public class IcebergMetadataCache { snapshotListCache.asMap().keySet().stream() .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName) && key.tableName.equals( tblName)) - .forEach(snapshotListCache::invalidate); + .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() .filter(entry -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index c7fef68ee97..a6933f83d76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -64,6 +64,10 @@ public class IcebergMetadataOps implements ExternalMetadataOps { return catalog; } + public IcebergExternalCatalog getExternalCatalog() { + return dorisCatalog; + } + @Override public void close() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index 5025e075142..a3a978ccd7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -21,31 +21,39 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.common.UserException; -import org.apache.doris.thrift.TFileContent; +import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper; +import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; import org.apache.doris.thrift.TIcebergCommitData; +import org.apache.doris.thrift.TUpdateMode; import org.apache.doris.transaction.Transaction; -import com.google.common.base.VerifyException; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileContent; -import org.apache.iceberg.Metrics; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.WriteResult; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; public class IcebergTransaction implements Transaction { private static final Logger LOG = LogManager.getLogger(IcebergTransaction.class); + private final IcebergMetadataOps ops; + private SimpleTableInfo tableInfo; + private Table table; + + private org.apache.iceberg.Transaction transaction; private final List<TIcebergCommitData> commitDataList = Lists.newArrayList(); @@ -59,140 +67,123 @@ public class IcebergTransaction implements Transaction { } } - public void beginInsert(String dbName, String tbName) { - Table icebergTable = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName)); - transaction = icebergTable.newTransaction(); + public void beginInsert(SimpleTableInfo tableInfo) { + this.tableInfo = tableInfo; + this.table = getNativeTable(tableInfo); + this.transaction = table.newTransaction(); } - public void finishInsert() { - Table icebergTable = transaction.table(); - AppendFiles appendFiles = transaction.newAppend(); - - for (CommitTaskData task : convertToCommitTaskData()) { - DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) - .withPath(task.getPath()) - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withFormat(IcebergUtils.getFileFormat(icebergTable)) - .withMetrics(task.getMetrics()); - - if (icebergTable.spec().isPartitioned()) { - List<String> partitionValues = task.getPartitionValues() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - builder.withPartitionValues(partitionValues); - } - appendFiles.appendFile(builder.build()); + public void finishInsert(SimpleTableInfo tableInfo, Optional<InsertCommandContext> insertCtx) { + if (LOG.isDebugEnabled()) { + LOG.info("iceberg table {} insert table finished!", tableInfo); } - // in appendFiles.commit, it will generate metadata(manifest and snapshot) - // after appendFiles.commit, in current transaction, you can already see the new snapshot - appendFiles.commit(); - } - - public List<CommitTaskData> convertToCommitTaskData() { - List<CommitTaskData> commitTaskData = new ArrayList<>(); - for (TIcebergCommitData data : this.commitDataList) { - commitTaskData.add(new CommitTaskData( - data.getFilePath(), - data.getFileSize(), - new Metrics( - data.getRowCount(), - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP - ), - data.isSetPartitionValues() ? Optional.of(data.getPartitionValues()) : Optional.empty(), - convertToFileContent(data.getFileContent()), - data.isSetReferencedDataFiles() ? Optional.of(data.getReferencedDataFiles()) : Optional.empty() - )); + //create and start the iceberg transaction + TUpdateMode updateMode = TUpdateMode.APPEND; + if (insertCtx.isPresent()) { + updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE + : TUpdateMode.APPEND; } - return commitTaskData; + updateManifestAfterInsert(updateMode); } - private FileContent convertToFileContent(TFileContent content) { - if (content.equals(TFileContent.DATA)) { - return FileContent.DATA; - } else if (content.equals(TFileContent.POSITION_DELETES)) { - return FileContent.POSITION_DELETES; + private void updateManifestAfterInsert(TUpdateMode updateMode) { + PartitionSpec spec = table.spec(); + FileFormat fileFormat = IcebergUtils.getFileFormat(table); + + //convert commitDataList to writeResult + WriteResult writeResult = IcebergWriterHelper + .convertToWriterResult(fileFormat, spec, commitDataList); + List<WriteResult> pendingResults = Lists.newArrayList(writeResult); + + if (spec.isPartitioned()) { + partitionManifestUpdate(updateMode, table, pendingResults); + if (LOG.isDebugEnabled()) { + LOG.info("{} {} table partition manifest successful and writeResult : {}..", tableInfo, updateMode, + writeResult); + } } else { - return FileContent.EQUALITY_DELETES; + tableManifestUpdate(updateMode, table, pendingResults); + if (LOG.isDebugEnabled()) { + LOG.info("{} {} table manifest successful and writeResult : {}..", tableInfo, updateMode, + writeResult); + } } } @Override public void commit() throws UserException { - // Externally readable - // Manipulate the relevant data so that others can also see the latest table, such as: - // 1. hadoop: it will change the version number information in 'version-hint.text' - // 2. hive: it will change the table properties, the most important thing is to revise 'metadata_location' - // 3. and so on ... + // commit the iceberg transaction transaction.commitTransaction(); } @Override public void rollback() { - + //do nothing } public long getUpdateCnt() { return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum(); } - public static class CommitTaskData { - private final String path; - private final long fileSizeInBytes; - private final Metrics metrics; - private final Optional<List<String>> partitionValues; - private final FileContent content; - private final Optional<List<String>> referencedDataFiles; - - public CommitTaskData(String path, - long fileSizeInBytes, - Metrics metrics, - Optional<List<String>> partitionValues, - FileContent content, - Optional<List<String>> referencedDataFiles) { - this.path = path; - this.fileSizeInBytes = fileSizeInBytes; - this.metrics = metrics; - this.partitionValues = convertPartitionValuesForNull(partitionValues); - this.content = content; - this.referencedDataFiles = referencedDataFiles; - } - private Optional<List<String>> convertPartitionValuesForNull(Optional<List<String>> partitionValues) { - if (!partitionValues.isPresent()) { - return partitionValues; - } - List<String> values = partitionValues.get(); - if (!values.contains("null")) { - return partitionValues; - } - return Optional.of(values.stream().map(s -> s.equals("null") ? null : s).collect(Collectors.toList())); - } + private synchronized Table getNativeTable(SimpleTableInfo tableInfo) { + Objects.requireNonNull(tableInfo); + IcebergExternalCatalog externalCatalog = ops.getExternalCatalog(); + return IcebergUtils.getRemoteTable(externalCatalog, tableInfo); + } - public String getPath() { - return path; + private void partitionManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) { + if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) { + LOG.warn("{} partitionManifestUp method call but pendingResults is null or empty!", table.name()); + return; } - - public long getFileSizeInBytes() { - return fileSizeInBytes; + // Commit the appendPartitionOperator transaction. + if (updateMode == TUpdateMode.APPEND) { + commitAppendTxn(table, pendingResults); + } else { + ReplacePartitions appendPartitionOp = table.newReplacePartitions(); + for (WriteResult result : pendingResults) { + Preconditions.checkState(result.referencedDataFiles().length == 0, + "Should have no referenced data files."); + Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile); + } + appendPartitionOp.commit(); } + } - public Metrics getMetrics() { - return metrics; + private void tableManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) { + if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) { + LOG.warn("{} tableManifestUp method call but pendingResults is null or empty!", table.name()); + return; } - - public Optional<List<String>> getPartitionValues() { - return partitionValues; + // Commit the appendPartitionOperator transaction. + if (LOG.isDebugEnabled()) { + LOG.info("{} tableManifestUp method call ", table.name()); } - - public FileContent getContent() { - return content; + if (updateMode == TUpdateMode.APPEND) { + commitAppendTxn(table, pendingResults); + } else { + ReplacePartitions appendPartitionOp = table.newReplacePartitions(); + for (WriteResult result : pendingResults) { + Preconditions.checkState(result.referencedDataFiles().length == 0, + "Should have no referenced data files."); + Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile); + } + appendPartitionOp.commit(); } + } - public Optional<List<String>> getReferencedDataFiles() { - return referencedDataFiles; + + private void commitAppendTxn(Table table, List<WriteResult> pendingResults) { + // To be compatible with iceberg format V1. + AppendFiles appendFiles = table.newAppend(); + for (WriteResult result : pendingResults) { + Preconditions.checkState(result.referencedDataFiles().length == 0, + "Should have no referenced data files for append."); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); } + appendFiles.commit(); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 2aa5dda35a4..512e6a3ee93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -43,6 +43,7 @@ import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; import org.apache.doris.common.UserException; +import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; @@ -50,6 +51,7 @@ import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; import com.google.common.collect.Lists; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -87,6 +89,8 @@ public class IcebergUtils { // https://iceberg.apache.org/spec/#schemas-and-data-types // All time and timestamp values are stored with microsecond precision private static final int ICEBERG_DATETIME_SCALE_MS = 6; + private static final String PARQUET_NAME = "parquet"; + private static final String ORC_NAME = "orc"; public static final String TOTAL_RECORDS = "total-records"; public static final String TOTAL_POSITION_DELETES = "total-position-deletes"; @@ -522,8 +526,8 @@ public class IcebergUtils { case MAP: Types.MapType map = (Types.MapType) type; return new MapType( - icebergTypeToDorisType(map.keyType()), - icebergTypeToDorisType(map.valueType()) + icebergTypeToDorisType(map.keyType()), + icebergTypeToDorisType(map.valueType()) ); case STRUCT: Types.StructType struct = (Types.StructType) type; @@ -536,11 +540,30 @@ public class IcebergUtils { } } + public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog catalog, String dbName, String tblName) { + return getIcebergTableInternal(catalog, dbName, tblName, false); + } + + public static org.apache.iceberg.Table getAndCloneTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { + return getIcebergTableInternal(catalog, tableInfo.getDbName(), tableInfo.getTbName(), true); + } + + public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { return Env.getCurrentEnv() .getExtMetaCacheMgr() .getIcebergMetadataCache() - .getIcebergTable(catalog, dbName, tblName); + .getRemoteTable(catalog, tableInfo.getDbName(), tableInfo.getTbName()); + } + + private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog catalog, String dbName, + String tblName, + boolean isClone) { + IcebergMetadataCache metadataCache = Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergMetadataCache(); + return isClone ? metadataCache.getAndCloneTable(catalog, dbName, tblName) + : metadataCache.getIcebergTable(catalog, dbName, tblName); } /** @@ -587,17 +610,27 @@ public class IcebergUtils { return -1; } - public static String getFileFormat(Table table) { - Map<String, String> properties = table.properties(); + + public static FileFormat getFileFormat(Table icebergTable) { + Map<String, String> properties = icebergTable.properties(); + String fileFormatName; if (properties.containsKey(WRITE_FORMAT)) { - return properties.get(WRITE_FORMAT); + fileFormatName = properties.get(WRITE_FORMAT); + } else { + fileFormatName = properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, PARQUET_NAME); } - if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) { - return properties.get(TableProperties.DEFAULT_FILE_FORMAT); + FileFormat fileFormat; + if (fileFormatName.toLowerCase().contains(ORC_NAME)) { + fileFormat = FileFormat.ORC; + } else if (fileFormatName.toLowerCase().contains(PARQUET_NAME)) { + fileFormat = FileFormat.PARQUET; + } else { + throw new RuntimeException("Unsupported input format type: " + fileFormatName); } - return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + return fileFormat; } + public static String getFileCompress(Table table) { Map<String, String> properties = table.properties(); if (properties.containsKey(COMPRESSION_CODEC)) { @@ -605,11 +638,11 @@ public class IcebergUtils { } else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) { return properties.get(SPARK_SQL_COMPRESSION_CODEC); } - String fileFormat = getFileFormat(table); - if (fileFormat.equalsIgnoreCase("parquet")) { + FileFormat fileFormat = getFileFormat(table); + if (fileFormat == FileFormat.PARQUET) { return properties.getOrDefault( TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); - } else if (fileFormat.equalsIgnoreCase("orc")) { + } else if (fileFormat == FileFormat.ORC) { return properties.getOrDefault( TableProperties.ORC_COMPRESSION, TableProperties.ORC_COMPRESSION_DEFAULT); } @@ -620,9 +653,10 @@ public class IcebergUtils { Map<String, String> properties = table.properties(); if (properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) { throw new NotSupportedException( - "Table " + table.name() + " specifies " + properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL) - + " as a location provider. " - + "Writing to Iceberg tables with custom location provider is not supported."); + "Table " + table.name() + " specifies " + properties + .get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL) + + " as a location provider. " + + "Writing to Iceberg tables with custom location provider is not supported."); } String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION); if (dataLocation == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java new file mode 100644 index 00000000000..4171a0536f9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.helper; + +import org.apache.doris.datasource.statistics.CommonStatistics; +import org.apache.doris.thrift.TIcebergCommitData; + +import com.google.common.base.VerifyException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.io.WriteResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +public class IcebergWriterHelper { + + private static final int DEFAULT_FILE_COUNT = 1; + + public static WriteResult convertToWriterResult( + FileFormat format, + PartitionSpec spec, + List<TIcebergCommitData> commitDataList) { + List<DataFile> dataFiles = new ArrayList<>(); + for (TIcebergCommitData commitData : commitDataList) { + //get the files path + String location = commitData.getFilePath(); + + //get the commit file statistics + long fileSize = commitData.getFileSize(); + long recordCount = commitData.getRowCount(); + CommonStatistics stat = new CommonStatistics(recordCount, DEFAULT_FILE_COUNT, fileSize); + + Optional<List<String>> partValues = Optional.empty(); + //get and check partitionValues when table is partitionedTable + if (spec.isPartitioned()) { + List<String> partitionValues = commitData.getPartitionValues(); + if (Objects.isNull(partitionValues) || partitionValues.isEmpty()) { + throw new VerifyException("No partition data for partitioned table"); + } + partitionValues = partitionValues.stream().map(s -> s.equals("null") ? null : s) + .collect(Collectors.toList()); + partValues = Optional.of(partitionValues); + } + DataFile dataFile = genDataFile(format, location, spec, partValues, stat); + dataFiles.add(dataFile); + } + return WriteResult.builder() + .addDataFiles(dataFiles) + .build(); + + } + + public static DataFile genDataFile( + FileFormat format, + String location, + PartitionSpec spec, + Optional<List<String>> partValues, + CommonStatistics statistics) { + + DataFiles.Builder builder = DataFiles.builder(spec) + .withPath(location) + .withFileSizeInBytes(statistics.getTotalFileBytes()) + .withRecordCount(statistics.getRowCount()) + .withFormat(format); + + partValues.ifPresent(builder::withPartitionValues); + + return builder.build(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java index e590e918344..56ff188f964 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java @@ -61,7 +61,7 @@ public class IcebergApiSource implements IcebergSource { @Override public String getFileFormat() { - return IcebergUtils.getFileFormat(originTable); + return IcebergUtils.getFileFormat(originTable).name(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java index 06b785a15f8..5e9860171d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java @@ -41,7 +41,7 @@ public class IcebergHMSSource implements IcebergSource { private final org.apache.iceberg.Table icebergTable; public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc, - Map<String, ColumnRange> columnNameToRange) { + Map<String, ColumnRange> columnNameToRange) { this.hmsTable = hmsTable; this.desc = desc; this.columnNameToRange = columnNameToRange; @@ -58,7 +58,7 @@ public class IcebergHMSSource implements IcebergSource { @Override public String getFileFormat() throws DdlException, MetaNotFoundException { - return IcebergUtils.getFileFormat(icebergTable); + return IcebergUtils.getFileFormat(icebergTable).name(); } public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java new file mode 100644 index 00000000000..9685dfdf35a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.statistics; + +public class CommonStatistics { + + public static final CommonStatistics EMPTY = new CommonStatistics(0L, 0L, 0L); + + private final long rowCount; + private final long fileCount; + private final long totalFileBytes; + + public CommonStatistics(long rowCount, long fileCount, long totalFileBytes) { + this.fileCount = fileCount; + this.rowCount = rowCount; + this.totalFileBytes = totalFileBytes; + } + + public long getRowCount() { + return rowCount; + } + + public long getFileCount() { + return fileCount; + } + + public long getTotalFileBytes() { + return totalFileBytes; + } + + public static CommonStatistics reduce( + CommonStatistics current, + CommonStatistics update, + ReduceOperator operator) { + return new CommonStatistics( + reduce(current.getRowCount(), update.getRowCount(), operator), + reduce(current.getFileCount(), update.getFileCount(), operator), + reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator)); + } + + public static long reduce(long current, long update, ReduceOperator operator) { + if (current >= 0 && update >= 0) { + switch (operator) { + case ADD: + return current + update; + case SUBTRACT: + return current - update; + case MAX: + return Math.max(current, update); + case MIN: + return Math.min(current, update); + default: + throw new IllegalArgumentException("Unexpected operator: " + operator); + } + } + + return 0; + } + + public enum ReduceOperator { + ADD, + SUBTRACT, + MIN, + MAX, + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java index b19c483c9f3..86b1f1ef0b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.common.UserException; +import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergTransaction; import org.apache.doris.nereids.NereidsPlanner; @@ -39,9 +40,9 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor { * constructor */ public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table, - String labelName, NereidsPlanner planner, - Optional<InsertCommandContext> insertCtx, - boolean emptyInsert) { + String labelName, NereidsPlanner planner, + Optional<InsertCommandContext> insertCtx, + boolean emptyInsert) { super(ctx, table, labelName, planner, insertCtx, emptyInsert); } @@ -51,11 +52,23 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor { coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData); } + @Override + protected void beforeExec() { + String dbName = ((IcebergExternalTable) table).getDbName(); + String tbName = table.getName(); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName); + IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); + transaction.beginInsert(tableInfo); + } + @Override protected void doBeforeCommit() throws UserException { + String dbName = ((IcebergExternalTable) table).getDbName(); + String tbName = table.getName(); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName); IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); - loadedRows = transaction.getUpdateCnt(); - transaction.finishInsert(); + this.loadedRows = transaction.getUpdateCnt(); + transaction.finishInsert(tableInfo, insertCtx); } @Override @@ -63,9 +76,4 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor { return TransactionType.ICEBERG; } - @Override - protected void beforeExec() { - IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); - transaction.beginInsert(((IcebergExternalTable) table).getDbName(), table.getName()); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java index 659be7cb1fe..0e01b599964 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java @@ -121,7 +121,7 @@ public class IcebergTableSink extends BaseExternalTableDataSink { } // file info - tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable))); + tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable).name())); tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable))); // hadoop config diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java index 4f4fe956d4b..f373c133685 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java @@ -17,6 +17,7 @@ package org.apache.doris.transaction; + import org.apache.doris.catalog.Env; import org.apache.doris.common.UserException; import org.apache.doris.datasource.iceberg.IcebergMetadataOps; @@ -58,12 +59,12 @@ public class IcebergTransactionManager implements TransactionManager { } @Override - public Transaction getTransaction(long id) { + public IcebergTransaction getTransaction(long id) { return getTransactionWithException(id); } - public Transaction getTransactionWithException(long id) { - Transaction icebergTransaction = transactions.get(id); + public IcebergTransaction getTransactionWithException(long id) { + IcebergTransaction icebergTransaction = transactions.get(id); if (icebergTransaction == null) { throw new RuntimeException("Can't find transaction for " + id); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java index 10de5427902..4375dc5c025 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java @@ -18,15 +18,22 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.common.UserException; +import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.thrift.TFileContent; import org.apache.doris.thrift.TIcebergCommitData; +import com.google.common.collect.Maps; +import mockit.Mock; +import mockit.MockUp; +import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Expression; @@ -39,10 +46,11 @@ import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.io.Serializable; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; @@ -50,23 +58,26 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; public class IcebergTransactionTest { - public static String dbName = "db3"; - public static String tbWithPartition = "tbWithPartition"; - public static String tbWithoutPartition = "tbWithoutPartition"; - public static IcebergMetadataOps ops; - public static Schema schema; + private static String dbName = "db3"; + private static String tbWithPartition = "tbWithPartition"; + private static String tbWithoutPartition = "tbWithoutPartition"; - @BeforeClass - public static void beforeClass() throws IOException { + private IcebergExternalCatalog externalCatalog; + private IcebergMetadataOps ops; + + + @Before + public void init() throws IOException { createCatalog(); createTable(); } - public static void createCatalog() throws IOException { + private void createCatalog() throws IOException { Path warehousePath = Files.createTempDirectory("test_warehouse_"); String warehouse = "file://" + warehousePath.toAbsolutePath() + "/"; HadoopCatalog hadoopCatalog = new HadoopCatalog(); @@ -74,25 +85,32 @@ public class IcebergTransactionTest { props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); hadoopCatalog.setConf(new Configuration()); hadoopCatalog.initialize("df", props); - ops = new IcebergMetadataOps(null, hadoopCatalog); + this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg", "", Maps.newHashMap(), ""); + new MockUp<IcebergHMSExternalCatalog>() { + @Mock + public Catalog getCatalog() { + return hadoopCatalog; + } + }; + ops = new IcebergMetadataOps(externalCatalog, hadoopCatalog); } - public static void createTable() throws IOException { + private void createTable() throws IOException { HadoopCatalog icebergCatalog = (HadoopCatalog) ops.getCatalog(); icebergCatalog.createNamespace(Namespace.of(dbName)); - schema = new Schema( - Types.NestedField.required(11, "ts1", Types.TimestampType.withoutZone()), - Types.NestedField.required(12, "ts2", Types.TimestampType.withoutZone()), - Types.NestedField.required(13, "ts3", Types.TimestampType.withoutZone()), - Types.NestedField.required(14, "ts4", Types.TimestampType.withoutZone()), - Types.NestedField.required(15, "dt1", Types.DateType.get()), - Types.NestedField.required(16, "dt2", Types.DateType.get()), - Types.NestedField.required(17, "dt3", Types.DateType.get()), - Types.NestedField.required(18, "dt4", Types.DateType.get()), - Types.NestedField.required(19, "str1", Types.StringType.get()), - Types.NestedField.required(20, "str2", Types.StringType.get()), - Types.NestedField.required(21, "int1", Types.IntegerType.get()), - Types.NestedField.required(22, "int2", Types.IntegerType.get()) + Schema schema = new Schema( + Types.NestedField.required(11, "ts1", Types.TimestampType.withoutZone()), + Types.NestedField.required(12, "ts2", Types.TimestampType.withoutZone()), + Types.NestedField.required(13, "ts3", Types.TimestampType.withoutZone()), + Types.NestedField.required(14, "ts4", Types.TimestampType.withoutZone()), + Types.NestedField.required(15, "dt1", Types.DateType.get()), + Types.NestedField.required(16, "dt2", Types.DateType.get()), + Types.NestedField.required(17, "dt3", Types.DateType.get()), + Types.NestedField.required(18, "dt4", Types.DateType.get()), + Types.NestedField.required(19, "str1", Types.StringType.get()), + Types.NestedField.required(20, "str2", Types.StringType.get()), + Types.NestedField.required(21, "int1", Types.IntegerType.get()), + Types.NestedField.required(22, "int2", Types.IntegerType.get()) ); PartitionSpec partitionSpec = PartitionSpec.builderFor(schema) @@ -112,7 +130,7 @@ public class IcebergTransactionTest { icebergCatalog.createTable(TableIdentifier.of(dbName, tbWithoutPartition), schema); } - public List<String> createPartitionValues() { + private List<String> createPartitionValues() { Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z"); long ts = DateTimeUtil.microsFromInstant(instant); @@ -165,14 +183,23 @@ public class IcebergTransactionTest { ctdList.add(ctd1); ctdList.add(ctd2); + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithPartition)); + + new MockUp<IcebergUtils>() { + @Mock + public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { + return table; + } + }; + IcebergTransaction txn = getTxn(); txn.updateIcebergCommitData(ctdList); - txn.beginInsert(dbName, tbWithPartition); - txn.finishInsert(); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbWithPartition); + txn.beginInsert(tableInfo); + txn.finishInsert(tableInfo, Optional.empty()); txn.commit(); - Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithPartition)); - checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); + checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); checkPushDownByPartitionForTs(table, "ts1"); checkPushDownByPartitionForTs(table, "ts2"); checkPushDownByPartitionForTs(table, "ts3"); @@ -189,7 +216,7 @@ public class IcebergTransactionTest { checkPushDownByPartitionForBucketInt(table, "int1"); } - public void checkPushDownByPartitionForBucketInt(Table table, String column) { + private void checkPushDownByPartitionForBucketInt(Table table, String column) { // (BucketUtil.hash(15) & Integer.MAX_VALUE) % 2 = 0 Integer i1 = 15; @@ -212,12 +239,12 @@ public class IcebergTransactionTest { checkPushDownByPartition(table, greaterThan2, 2); } - public void checkPushDownByPartitionForString(Table table, String column) { + private void checkPushDownByPartitionForString(Table table, String column) { // Since the string used to create the partition is in date format, the date check can be reused directly checkPushDownByPartitionForDt(table, column); } - public void checkPushDownByPartitionForTs(Table table, String column) { + private void checkPushDownByPartitionForTs(Table table, String column) { String lessTs = "2023-12-11T12:34:56.123456"; String eqTs = "2024-12-11T12:34:56.123456"; String greaterTs = "2025-12-11T12:34:56.123456"; @@ -230,7 +257,7 @@ public class IcebergTransactionTest { checkPushDownByPartition(table, greaterThan, 0); } - public void checkPushDownByPartitionForDt(Table table, String column) { + private void checkPushDownByPartitionForDt(Table table, String column) { String less = "2023-12-11"; String eq = "2024-12-11"; String greater = "2025-12-11"; @@ -243,7 +270,7 @@ public class IcebergTransactionTest { checkPushDownByPartition(table, greaterThan, 0); } - public void checkPushDownByPartition(Table table, Expression expr, Integer expectFiles) { + private void checkPushDownByPartition(Table table, Expression expr, Integer expectFiles) { CloseableIterable<FileScanTask> fileScanTasks = table.newScan().filter(expr).planFiles(); AtomicReference<Integer> cnt = new AtomicReference<>(0); fileScanTasks.forEach(notUse -> cnt.updateAndGet(v -> v + 1)); @@ -268,45 +295,64 @@ public class IcebergTransactionTest { ctdList.add(ctd1); ctdList.add(ctd2); + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition)); + new MockUp<IcebergUtils>() { + @Mock + public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { + return table; + } + }; + IcebergTransaction txn = getTxn(); txn.updateIcebergCommitData(ctdList); - txn.beginInsert(dbName, tbWithoutPartition); - txn.finishInsert(); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbWithPartition); + txn.beginInsert(tableInfo); + txn.finishInsert(tableInfo, Optional.empty()); txn.commit(); - Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition)); checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); } - public void checkSnapshotProperties(Map<String, String> props, - String addRecords, - String addFileCnt, - String addFileSize) { + private IcebergTransaction getTxn() { + return new IcebergTransaction(ops); + } + + private void checkSnapshotProperties(Map<String, String> props, + String addRecords, + String addFileCnt, + String addFileSize) { Assert.assertEquals(addRecords, props.get("added-records")); Assert.assertEquals(addFileCnt, props.get("added-data-files")); Assert.assertEquals(addFileSize, props.get("added-files-size")); } - public String numToYear(Integer num) { + private String numToYear(Integer num) { Transform<Object, Integer> year = Transforms.year(); return year.toHumanString(Types.IntegerType.get(), num); } - public String numToMonth(Integer num) { + private String numToMonth(Integer num) { Transform<Object, Integer> month = Transforms.month(); return month.toHumanString(Types.IntegerType.get(), num); } - public String numToDay(Integer num) { + private String numToDay(Integer num) { Transform<Object, Integer> day = Transforms.day(); return day.toHumanString(Types.IntegerType.get(), num); } - public String numToHour(Integer num) { + private String numToHour(Integer num) { Transform<Object, Integer> hour = Transforms.hour(); return hour.toHumanString(Types.IntegerType.get(), num); } + @Test + public void tableCloneTest() { + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition)); + Table cloneTable = (Table) SerializationUtils.clone((Serializable) table); + Assert.assertNotNull(cloneTable); + } + @Test public void testTransform() { Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z"); @@ -322,7 +368,4 @@ public class IcebergTransactionTest { Assert.assertEquals("2024-12-11", numToDay(dt)); } - public IcebergTransaction getTxn() { - return new IcebergTransaction(ops); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org