This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new fdf5d055972 branch-3.1: [fix](iceberg) fix the iceberg timstamp_ntz
write schema and values bug. #51384 (#52478)
fdf5d055972 is described below
commit fdf5d055972380b16e0e22b294ab4841a9311af3
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 2 13:28:29 2025 +0800
branch-3.1: [fix](iceberg) fix the iceberg timstamp_ntz write schema and
values bug. #51384 (#52478)
Cherry-picked from #51384
Co-authored-by: kang <[email protected]>
---
be/src/pipeline/exec/result_sink_operator.h | 4 ++
.../serde/data_type_datetimev2_serde.cpp | 7 ++-
.../format/table/iceberg/arrow_schema_util.cpp | 4 +-
be/src/vec/runtime/vparquet_transformer.cpp | 45 +++++++-------
be/src/vec/runtime/vparquet_transformer.h | 25 ++++----
.../writer/iceberg/viceberg_partition_writer.cpp | 8 +--
be/src/vec/sink/writer/vfile_result_writer.cpp | 5 +-
be/src/vec/sink/writer/vhive_partition_writer.cpp | 8 +--
.../create_preinstalled_scripts/iceberg/run12.sql | 13 ++++
.../fileformat/ParquetFileFormatProperties.java | 12 ++++
.../ParquetFileFormatPropertiesTest.java | 11 ++++
gensrc/thrift/DataSinks.thrift | 5 ++
.../write/test_iceberg_write_timestamp_ntz.out | Bin 0 -> 257 bytes
.../write/test_iceberg_write_timestamp_ntz.groovy | 68 +++++++++++++++++++++
14 files changed, 170 insertions(+), 45 deletions(-)
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 339c1678256..06544f9bc18 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -47,6 +47,7 @@ struct ResultFileOptions {
TParquetCompressionType::type parquet_commpression_type;
TParquetVersion::type parquet_version;
bool parquert_disable_dictionary;
+ bool enable_int96_timestamps;
//note: use outfile with parquet format, have deprecated 9:schema and
10:file_properties
//But in order to consider the compatibility when upgrading, so add a bool
to check
//Now the code version is 1.1.2, so when the version is after 1.2, could
remove this code.
@@ -103,6 +104,9 @@ struct ResultFileOptions {
if (t_opt.__isset.parquet_version) {
parquet_version = t_opt.parquet_version;
}
+ if (t_opt.__isset.enable_int96_timestamps) {
+ enable_int96_timestamps = t_opt.enable_int96_timestamps;
+ }
if (t_opt.__isset.orc_schema) {
orc_schema = t_opt.orc_schema;
}
diff --git a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
index 3b4142a2c01..2032dcda9fc 100644
--- a/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
@@ -18,6 +18,7 @@
#include "data_type_datetimev2_serde.h"
#include <arrow/builder.h>
+#include <cctz/time_zone.h>
#include <chrono> // IWYU pragma: keep
@@ -110,6 +111,10 @@ void DataTypeDateTimeV2SerDe::write_column_to_arrow(const
IColumn& column, const
int end, const
cctz::time_zone& ctz) const {
const auto& col_data = static_cast<const
ColumnVector<UInt64>&>(column).get_data();
auto& timestamp_builder =
assert_cast<arrow::TimestampBuilder&>(*array_builder);
+ std::shared_ptr<arrow::TimestampType> timestamp_type =
+
std::static_pointer_cast<arrow::TimestampType>(array_builder->type());
+ const std::string& timezone = timestamp_type->timezone();
+ const cctz::time_zone& real_ctz = timezone == "" ? cctz::utc_time_zone() :
ctz;
for (size_t i = start; i < end; ++i) {
if (null_map && (*null_map)[i]) {
checkArrowStatus(timestamp_builder.AppendNull(), column.get_name(),
@@ -118,7 +123,7 @@ void DataTypeDateTimeV2SerDe::write_column_to_arrow(const
IColumn& column, const
int64_t timestamp = 0;
DateV2Value<DateTimeV2ValueType> datetime_val =
binary_cast<UInt64,
DateV2Value<DateTimeV2ValueType>>(col_data[i]);
- datetime_val.unix_timestamp(×tamp, ctz);
+ datetime_val.unix_timestamp(×tamp, real_ctz);
if (scale > 3) {
uint32_t microsecond = datetime_val.microsecond();
diff --git a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
index 35a4d51b7f1..eac0c7827ab 100644
--- a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
+++ b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
@@ -70,7 +70,9 @@ Status ArrowSchemaUtil::convert_to(const
iceberg::NestedField& field,
break;
case iceberg::TypeID::TIMESTAMP: {
- arrow_type =
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
+ iceberg::TimestampType* t_type =
static_cast<iceberg::TimestampType*>(field.field_type());
+ std::string real_tz = t_type->should_adjust_to_utc() ? timezone : "";
+ arrow_type =
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, real_tz);
break;
}
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp
b/be/src/vec/runtime/vparquet_transformer.cpp
index 8343cf3b468..d7e14ff4573 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -176,18 +176,17 @@ void
ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
}
}
-VParquetTransformer::VParquetTransformer(
- RuntimeState* state, doris::io::FileWriter* file_writer,
- const VExprContextSPtrs& output_vexpr_ctxs, std::vector<std::string>
column_names,
- TParquetCompressionType::type compression_type, bool
parquet_disable_dictionary,
- TParquetVersion::type parquet_version, bool output_object_data,
- const std::string* iceberg_schema_json, const iceberg::Schema*
iceberg_schema)
+VParquetTransformer::VParquetTransformer(RuntimeState* state,
doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs&
output_vexpr_ctxs,
+ std::vector<std::string> column_names,
+ bool output_object_data,
+ const ParquetFileOptions&
parquet_options,
+ const std::string*
iceberg_schema_json,
+ const iceberg::Schema* iceberg_schema)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_column_names(std::move(column_names)),
_parquet_schemas(nullptr),
- _compression_type(compression_type),
- _parquet_disable_dictionary(parquet_disable_dictionary),
- _parquet_version(parquet_version),
+ _parquet_options(parquet_options),
_iceberg_schema_json(iceberg_schema_json),
_iceberg_schema(iceberg_schema) {
_outstream = std::shared_ptr<ParquetOutputStream>(new
ParquetOutputStream(file_writer));
@@ -196,16 +195,12 @@ VParquetTransformer::VParquetTransformer(
VParquetTransformer::VParquetTransformer(RuntimeState* state,
doris::io::FileWriter* file_writer,
const VExprContextSPtrs&
output_vexpr_ctxs,
const std::vector<TParquetSchema>&
parquet_schemas,
- TParquetCompressionType::type
compression_type,
- bool parquet_disable_dictionary,
- TParquetVersion::type parquet_version,
bool output_object_data,
+ const ParquetFileOptions&
parquet_options,
const std::string*
iceberg_schema_json)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_parquet_schemas(&parquet_schemas),
- _compression_type(compression_type),
- _parquet_disable_dictionary(parquet_disable_dictionary),
- _parquet_version(parquet_version),
+ _parquet_options(parquet_options),
_iceberg_schema_json(iceberg_schema_json) {
_iceberg_schema = nullptr;
_outstream = std::shared_ptr<ParquetOutputStream>(new
ParquetOutputStream(file_writer));
@@ -214,10 +209,12 @@ VParquetTransformer::VParquetTransformer(RuntimeState*
state, doris::io::FileWri
Status VParquetTransformer::_parse_properties() {
try {
arrow::MemoryPool* pool = ExecEnv::GetInstance()->arrow_memory_pool();
+
+ //build parquet writer properties
parquet::WriterProperties::Builder builder;
- ParquetBuildHelper::build_compression_type(builder, _compression_type);
- ParquetBuildHelper::build_version(builder, _parquet_version);
- if (_parquet_disable_dictionary) {
+ ParquetBuildHelper::build_compression_type(builder,
_parquet_options.compression_type);
+ ParquetBuildHelper::build_version(builder,
_parquet_options.parquet_version);
+ if (_parquet_options.parquet_disable_dictionary) {
builder.disable_dictionary();
} else {
builder.enable_dictionary();
@@ -227,10 +224,14 @@ Status VParquetTransformer::_parse_properties() {
builder.max_row_group_length(std::numeric_limits<int64_t>::max());
builder.memory_pool(pool);
_parquet_writer_properties = builder.build();
- _arrow_properties = parquet::ArrowWriterProperties::Builder()
- .enable_deprecated_int96_timestamps()
- ->store_schema()
- ->build();
+
+ //build arrow writer properties
+ parquet::ArrowWriterProperties::Builder arrow_builder;
+ if (_parquet_options.enable_int96_timestamps) {
+ arrow_builder.enable_deprecated_int96_timestamps();
+ }
+ arrow_builder.store_schema();
+ _arrow_properties = arrow_builder.build();
} catch (const parquet::ParquetException& e) {
return Status::InternalError("parquet writer parse properties error:
{}", e.what());
}
diff --git a/be/src/vec/runtime/vparquet_transformer.h
b/be/src/vec/runtime/vparquet_transformer.h
index 5f3173af181..833155524f7 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -88,23 +88,28 @@ public:
const TypeDescriptor& type_desc);
};
+struct ParquetFileOptions {
+ TParquetCompressionType::type compression_type;
+ TParquetVersion::type parquet_version;
+ bool parquet_disable_dictionary = false;
+ bool enable_int96_timestamps = false;
+};
+
// a wrapper of parquet output stream
class VParquetTransformer final : public VFileFormatTransformer {
public:
VParquetTransformer(RuntimeState* state, doris::io::FileWriter*
file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
- std::vector<std::string> column_names,
- TParquetCompressionType::type compression_type,
- bool parquet_disable_dictionary, TParquetVersion::type
parquet_version,
- bool output_object_data, const std::string*
iceberg_schema_json = nullptr,
+ std::vector<std::string> column_names, bool
output_object_data,
+ const ParquetFileOptions& parquet_options,
+ const std::string* iceberg_schema_json = nullptr,
const iceberg::Schema* iceberg_schema = nullptr);
VParquetTransformer(RuntimeState* state, doris::io::FileWriter*
file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
- const std::vector<TParquetSchema>& parquet_schemas,
- TParquetCompressionType::type compression_type,
- bool parquet_disable_dictionary, TParquetVersion::type
parquet_version,
- bool output_object_data, const std::string*
iceberg_schema_json = nullptr);
+ const std::vector<TParquetSchema>& parquet_schemas,
bool output_object_data,
+ const ParquetFileOptions& parquet_options,
+ const std::string* iceberg_schema_json = nullptr);
~VParquetTransformer() override = default;
@@ -129,9 +134,7 @@ private:
std::vector<std::string> _column_names;
const std::vector<TParquetSchema>* _parquet_schemas = nullptr;
- const TParquetCompressionType::type _compression_type;
- const bool _parquet_disable_dictionary;
- const TParquetVersion::type _parquet_version;
+ const ParquetFileOptions _parquet_options;
const std::string* _iceberg_schema_json;
uint64_t _write_size = 0;
const iceberg::Schema* _iceberg_schema;
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index aeaa81d9995..933c8d3f562 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -64,7 +64,6 @@ Status VIcebergPartitionWriter::open(RuntimeState* state,
RuntimeProfile* profil
switch (_file_format_type) {
case TFileFormatType::FORMAT_PARQUET: {
- bool parquet_disable_dictionary = false;
TParquetCompressionType::type parquet_compression_type;
switch (_compress_type) {
case TFileCompressType::PLAIN: {
@@ -84,10 +83,11 @@ Status VIcebergPartitionWriter::open(RuntimeState* state,
RuntimeProfile* profil
to_string(_compress_type));
}
}
+ ParquetFileOptions parquet_options = {parquet_compression_type,
+ TParquetVersion::PARQUET_1_0,
false, false};
_file_format_transformer.reset(new VParquetTransformer(
- state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names,
- parquet_compression_type, parquet_disable_dictionary,
TParquetVersion::PARQUET_1_0,
- false, _iceberg_schema_json, &_schema));
+ state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names, false,
+ parquet_options, _iceberg_schema_json, &_schema));
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index bb6a54c4693..555b44ea3df 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -134,8 +134,9 @@ Status VFileResultWriter::_create_file_writer(const
std::string& file_name) {
case TFileFormatType::FORMAT_PARQUET:
_vfile_writer.reset(new VParquetTransformer(
_state, _file_writer_impl.get(), _vec_output_expr_ctxs,
_file_opts->parquet_schemas,
- _file_opts->parquet_commpression_type,
_file_opts->parquert_disable_dictionary,
- _file_opts->parquet_version, _output_object_data));
+ _output_object_data,
+ {_file_opts->parquet_commpression_type,
_file_opts->parquet_version,
+ _file_opts->parquert_disable_dictionary,
_file_opts->enable_int96_timestamps}));
break;
case TFileFormatType::FORMAT_ORC:
_vfile_writer.reset(new VOrcTransformer(
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index b93303dff03..70f588c4c7e 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -70,7 +70,6 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
switch (_file_format_type) {
case TFileFormatType::FORMAT_PARQUET: {
- bool parquet_disable_dictionary = false;
TParquetCompressionType::type parquet_compression_type;
switch (_hive_compress_type) {
case TFileCompressType::PLAIN: {
@@ -90,10 +89,11 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
to_string(_hive_compress_type));
}
}
+ ParquetFileOptions parquet_options = {parquet_compression_type,
+ TParquetVersion::PARQUET_1_0,
false, true};
_file_format_transformer = std::make_unique<VParquetTransformer>(
- state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names,
- parquet_compression_type, parquet_disable_dictionary,
TParquetVersion::PARQUET_1_0,
- false);
+ state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names, false,
+ parquet_options);
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run12.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run12.sql
new file mode 100644
index 00000000000..53778efd09c
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run12.sql
@@ -0,0 +1,13 @@
+
+use demo.test_db;
+
+
+drop table if exists t_ntz_doris;
+CREATE TABLE t_ntz_doris (
+ col TIMESTAMP_NTZ)
+USING iceberg;
+
+drop table if exists t_tz_doris;
+CREATE TABLE t_tz_doris (
+ col TIMESTAMP)
+USING iceberg;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
index 18d1484e596..9c4d5f2ae49 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
public class ParquetFileFormatProperties extends FileFormatProperties {
public static final String PARQUET_DISABLE_DICTIONARY =
"disable_dictionary";
public static final String PARQUET_VERSION = "version";
+ public static final String ENABLE_INT96_TIMESTAMPS =
"enable_int96_timestamps";
public static final String PARQUET_PROP_PREFIX = "parquet.";
public static final Logger LOG =
LogManager.getLogger(ParquetFileFormatProperties.class);
@@ -60,6 +61,7 @@ public class ParquetFileFormatProperties extends
FileFormatProperties {
private TParquetCompressionType parquetCompressionType =
TParquetCompressionType.SNAPPY;
private boolean parquetDisableDictionary = false;
private TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0;
+ private boolean enableInt96Timestamps = true;
public ParquetFileFormatProperties() {
super(TFileFormatType.FORMAT_PARQUET,
FileFormatProperties.FORMAT_PARQUET);
@@ -82,6 +84,11 @@ public class ParquetFileFormatProperties extends
FileFormatProperties {
}
}
+ //save the enable int96 timestamp property
+ if (formatProperties.containsKey(ENABLE_INT96_TIMESTAMPS)) {
+ this.enableInt96Timestamps =
Boolean.valueOf(formatProperties.get(ENABLE_INT96_TIMESTAMPS)).booleanValue();
+ }
+
// save all parquet prefix property
Iterator<Entry<String, String>> iter =
formatProperties.entrySet().iterator();
while (iter.hasNext()) {
@@ -109,6 +116,7 @@ public class ParquetFileFormatProperties extends
FileFormatProperties {
sinkOptions.setParquetCompressionType(parquetCompressionType);
sinkOptions.setParquetDisableDictionary(parquetDisableDictionary);
sinkOptions.setParquetVersion(parquetVersion);
+ sinkOptions.setEnableInt96Timestamps(enableInt96Timestamps);
}
@Override
@@ -126,4 +134,8 @@ public class ParquetFileFormatProperties extends
FileFormatProperties {
public boolean isParquetDisableDictionary() {
return parquetDisableDictionary;
}
+
+ public boolean isEnableInt96Timestamps() {
+ return enableInt96Timestamps;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
index 754d857613f..370e4965765 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
@@ -94,6 +94,17 @@ public class ParquetFileFormatPropertiesTest {
Assert.assertFalse(parquetFileFormatProperties.isParquetDisableDictionary());
}
+ @Test
+ public void testEnableInt96Timestamps() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("enable_int96_timestamps", "true");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+
Assert.assertTrue(parquetFileFormatProperties.isEnableInt96Timestamps());
+ properties.put("enable_int96_timestamps", "false");
+ parquetFileFormatProperties.analyzeFileFormatProperties(properties,
true);
+
Assert.assertFalse(parquetFileFormatProperties.isEnableInt96Timestamps());
+ }
+
@Test
public void testParquetVersion() {
Map<String, String> properties = new HashMap<>();
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 30348c091ee..73f91c84311 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -140,6 +140,11 @@ struct TResultFileSinkOptions {
// orc_writer_version = 1 means doris FE is higher than version 2.1.5
// orc_writer_version = 0 means doris FE is less than or equal to version
2.1.5
20: optional i64 orc_writer_version;
+
+ //iceberg write sink use int64
+ //hive write sink use int96
+ //export data to file use by user define properties
+ 21: optional bool enable_int96_timestamps
}
struct TMemoryScratchSink {
diff --git
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.out
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.out
new file mode 100644
index 00000000000..1980a167f36
Binary files /dev/null and
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.out
differ
diff --git
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.groovy
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.groovy
new file mode 100644
index 00000000000..33418c0e247
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_timestamp_ntz.groovy
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_write_timestamp_ntz",
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+
+ try {
+
+ String rest_port =
context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port =
context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "iceberg_timestamp_ntz_test"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """ use test_db;"""
+
+ sql """INSERT INTO t_ntz_doris VALUES ('2025-02-07 20:12:00');"""
+ sql """INSERT INTO t_tz_doris VALUES ('2025-02-07 20:12:01');"""
+
+
+ sql "set time_zone = 'Asia/Shanghai'"
+ qt_timestamp_ntz """select * from t_ntz_doris;"""
+ qt_timestamp_tz """select * from t_tz_doris;"""
+
+ sql "set time_zone = 'Europe/Tirane'"
+ qt_timestamp_ntz2 """select * from t_ntz_doris;"""
+ qt_timestamp_tz2 """select * from t_tz_doris;"""
+
+ // sql """drop catalog if exists ${catalog_name}"""
+
+ } finally {
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]