This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 47fbbfabace branch-3.0: [fix](iceberg)Bring field_id with parquet 
files And fix map type's key optional #44470 (#44827)
47fbbfabace is described below

commit 47fbbfabaced50ce78228944824d8e49579975f5
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Dec 2 10:48:52 2024 +0800

    branch-3.0: [fix](iceberg)Bring field_id with parquet files And fix map 
type's key optional #44470 (#44827)
    
    Cherry-picked from #44470
    
    Co-authored-by: wuwenchi <wuwen...@selectdb.com>
---
 .../format/table/iceberg/arrow_schema_util.cpp     | 134 +++++++++
 .../exec/format/table/iceberg/arrow_schema_util.h  |  45 +++
 be/src/vec/exec/format/table/iceberg/types.cpp     |   3 +-
 be/src/vec/exec/format/table/iceberg/types.h       |   4 +
 be/src/vec/runtime/vparquet_transformer.cpp        |  51 ++--
 be/src/vec/runtime/vparquet_transformer.h          |   5 +-
 .../writer/iceberg/viceberg_partition_writer.cpp   |   2 +-
 .../table/iceberg/arrow_schema_util_test.cpp       | 304 +++++++++++++++++++++
 .../format/table/iceberg/schema_parser_test.cpp    |  24 ++
 9 files changed, 547 insertions(+), 25 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp 
b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
new file mode 100644
index 00000000000..35a4d51b7f1
--- /dev/null
+++ b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
+
+#include <arrow/type.h>
+#include <arrow/util/key_value_metadata.h>
+
+namespace doris {
+namespace iceberg {
+
+const char* ArrowSchemaUtil::PARQUET_FIELD_ID = "PARQUET:field_id";
+const char* ArrowSchemaUtil::ORIGINAL_TYPE = "originalType";
+const char* ArrowSchemaUtil::MAP_TYPE_VALUE = "mapType";
+
+Status ArrowSchemaUtil::convert(const Schema* schema, const std::string& 
timezone,
+                                std::vector<std::shared_ptr<arrow::Field>>& 
fields) {
+    for (const auto& column : schema->columns()) {
+        std::shared_ptr<arrow::Field> arrow_field;
+        RETURN_IF_ERROR(convert_to(column, &arrow_field, timezone));
+        fields.push_back(arrow_field);
+    }
+    return Status::OK();
+}
+
+Status ArrowSchemaUtil::convert_to(const iceberg::NestedField& field,
+                                   std::shared_ptr<arrow::Field>* arrow_field,
+                                   const std::string& timezone) {
+    std::shared_ptr<arrow::DataType> arrow_type;
+    std::unordered_map<std::string, std::string> metadata;
+    metadata[PARQUET_FIELD_ID] = std::to_string(field.field_id());
+
+    switch (field.field_type()->type_id()) {
+    case iceberg::TypeID::BOOLEAN:
+        arrow_type = arrow::boolean();
+        break;
+
+    case iceberg::TypeID::INTEGER:
+        arrow_type = arrow::int32();
+        break;
+
+    case iceberg::TypeID::LONG:
+        arrow_type = arrow::int64();
+        break;
+
+    case iceberg::TypeID::FLOAT:
+        arrow_type = arrow::float32();
+        break;
+
+    case iceberg::TypeID::DOUBLE:
+        arrow_type = arrow::float64();
+        break;
+
+    case iceberg::TypeID::DATE:
+        arrow_type = arrow::date32();
+        break;
+
+    case iceberg::TypeID::TIMESTAMP: {
+        arrow_type = 
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
+        break;
+    }
+
+    case iceberg::TypeID::BINARY:
+    case iceberg::TypeID::STRING:
+    case iceberg::TypeID::UUID:
+    case iceberg::TypeID::FIXED:
+        arrow_type = arrow::utf8();
+        break;
+
+    case iceberg::TypeID::DECIMAL: {
+        auto dt = dynamic_cast<DecimalType*>(field.field_type());
+        arrow_type = arrow::decimal(dt->get_precision(), dt->get_scale());
+        break;
+    }
+
+    case iceberg::TypeID::STRUCT: {
+        std::vector<std::shared_ptr<arrow::Field>> element_fields;
+        StructType* st = field.field_type()->as_struct_type();
+        for (const auto& column : st->fields()) {
+            std::shared_ptr<arrow::Field> element_field;
+            RETURN_IF_ERROR(convert_to(column, &element_field, timezone));
+            element_fields.push_back(element_field);
+        }
+        arrow_type = arrow::struct_(element_fields);
+        break;
+    }
+
+    case iceberg::TypeID::LIST: {
+        std::shared_ptr<arrow::Field> item_field;
+        ListType* list_type = field.field_type()->as_list_type();
+        RETURN_IF_ERROR(convert_to(list_type->element_field(), &item_field, 
timezone));
+        arrow_type = arrow::list(item_field);
+        break;
+    }
+
+    case iceberg::TypeID::MAP: {
+        std::shared_ptr<arrow::Field> key_field;
+        std::shared_ptr<arrow::Field> value_field;
+        MapType* map_type = field.field_type()->as_map_type();
+        RETURN_IF_ERROR(convert_to(map_type->key_field(), &key_field, 
timezone));
+        RETURN_IF_ERROR(convert_to(map_type->value_field(), &value_field, 
timezone));
+        metadata[ORIGINAL_TYPE] = MAP_TYPE_VALUE;
+        arrow_type = std::make_shared<arrow::MapType>(key_field, value_field);
+        break;
+    }
+
+    case iceberg::TypeID::TIME:
+    default:
+        return Status::InternalError("Unsupported field type:" + 
field.field_type()->to_string());
+    }
+
+    std::shared_ptr<arrow::KeyValueMetadata> schema_metadata =
+            std::make_shared<arrow::KeyValueMetadata>(metadata);
+    *arrow_field =
+            arrow::field(field.field_name(), arrow_type, field.is_optional(), 
schema_metadata);
+    return Status::OK();
+}
+
+} // namespace iceberg
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.h 
b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.h
new file mode 100644
index 00000000000..20b7dbc627c
--- /dev/null
+++ b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/type.h>
+
+#include <shared_mutex>
+
+#include "vec/exec/format/table/iceberg/schema.h"
+
+namespace doris {
+namespace iceberg {
+
+class ArrowSchemaUtil {
+public:
+    static Status convert(const Schema* schema, const std::string& timezone,
+                          std::vector<std::shared_ptr<arrow::Field>>& fields);
+
+private:
+    static const char* PARQUET_FIELD_ID;
+    static const char* ORIGINAL_TYPE;
+    static const char* MAP_TYPE_VALUE;
+
+    static Status convert_to(const iceberg::NestedField& field,
+                             std::shared_ptr<arrow::Field>* arrow_field,
+                             const std::string& timezone);
+};
+
+} // namespace iceberg
+} // namespace doris
diff --git a/be/src/vec/exec/format/table/iceberg/types.cpp 
b/be/src/vec/exec/format/table/iceberg/types.cpp
index b56a231979a..bf643655ab8 100644
--- a/be/src/vec/exec/format/table/iceberg/types.cpp
+++ b/be/src/vec/exec/format/table/iceberg/types.cpp
@@ -25,8 +25,9 @@ namespace iceberg {
 std::unique_ptr<MapType> MapType::of_optional(int key_id, int value_id,
                                               std::unique_ptr<Type> key_type,
                                               std::unique_ptr<Type> 
value_type) {
+    // key is always required
     auto key_field =
-            std::make_unique<NestedField>(true, key_id, "key", 
std::move(key_type), std::nullopt);
+            std::make_unique<NestedField>(false, key_id, "key", 
std::move(key_type), std::nullopt);
     auto value_field = std::make_unique<NestedField>(true, value_id, "value", 
std::move(value_type),
                                                      std::nullopt);
     return std::unique_ptr<MapType>(new MapType(std::move(key_field), 
std::move(value_field)));
diff --git a/be/src/vec/exec/format/table/iceberg/types.h 
b/be/src/vec/exec/format/table/iceberg/types.h
index f5262b36f55..91a2f705df0 100644
--- a/be/src/vec/exec/format/table/iceberg/types.h
+++ b/be/src/vec/exec/format/table/iceberg/types.h
@@ -265,6 +265,10 @@ public:
         ss << "decimal(" << precision << ", " << scale << ")";
         return ss.str();
     }
+
+    int get_precision() const { return precision; }
+
+    int get_scale() const { return scale; }
 };
 
 class BinaryType : public PrimitiveType {
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index f0810d6c7ce..86ca54909f7 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -65,6 +65,7 @@
 #include "vec/core/types.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/data_types/data_type_nullable.h"
+#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 #include "vec/functions/function_helpers.h"
@@ -201,21 +202,20 @@ void 
ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
     }
 }
 
-VParquetTransformer::VParquetTransformer(RuntimeState* state, 
doris::io::FileWriter* file_writer,
-                                         const VExprContextSPtrs& 
output_vexpr_ctxs,
-                                         std::vector<std::string> column_names,
-                                         TParquetCompressionType::type 
compression_type,
-                                         bool parquet_disable_dictionary,
-                                         TParquetVersion::type parquet_version,
-                                         bool output_object_data,
-                                         const std::string* 
iceberg_schema_json)
+VParquetTransformer::VParquetTransformer(
+        RuntimeState* state, doris::io::FileWriter* file_writer,
+        const VExprContextSPtrs& output_vexpr_ctxs, std::vector<std::string> 
column_names,
+        TParquetCompressionType::type compression_type, bool 
parquet_disable_dictionary,
+        TParquetVersion::type parquet_version, bool output_object_data,
+        const std::string* iceberg_schema_json, const iceberg::Schema* 
iceberg_schema)
         : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
           _column_names(std::move(column_names)),
           _parquet_schemas(nullptr),
           _compression_type(compression_type),
           _parquet_disable_dictionary(parquet_disable_dictionary),
           _parquet_version(parquet_version),
-          _iceberg_schema_json(iceberg_schema_json) {
+          _iceberg_schema_json(iceberg_schema_json),
+          _iceberg_schema(iceberg_schema) {
     _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
 }
 
@@ -233,6 +233,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState* 
state, doris::io::FileWri
           _parquet_disable_dictionary(parquet_disable_dictionary),
           _parquet_version(parquet_version),
           _iceberg_schema_json(iceberg_schema_json) {
+    _iceberg_schema = nullptr;
     _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
 }
 
@@ -264,21 +265,27 @@ Status VParquetTransformer::_parse_properties() {
 
 Status VParquetTransformer::_parse_schema() {
     std::vector<std::shared_ptr<arrow::Field>> fields;
-    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
-        std::shared_ptr<arrow::DataType> type;
-        
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), 
&type,
-                                              _state->timezone()));
-        if (_parquet_schemas != nullptr) {
-            std::shared_ptr<arrow::Field> field =
-                    
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
-                                 _output_vexpr_ctxs[i]->root()->is_nullable());
-            fields.emplace_back(field);
-        } else {
-            std::shared_ptr<arrow::Field> field = arrow::field(
-                    _column_names[i], type, 
_output_vexpr_ctxs[i]->root()->is_nullable());
-            fields.emplace_back(field);
+    if (_iceberg_schema != nullptr) {
+        RETURN_IF_ERROR(
+                iceberg::ArrowSchemaUtil::convert(_iceberg_schema, 
_state->timezone(), fields));
+    } else {
+        for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+            std::shared_ptr<arrow::DataType> type;
+            
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), 
&type,
+                                                  _state->timezone()));
+            if (_parquet_schemas != nullptr) {
+                std::shared_ptr<arrow::Field> field =
+                        
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
+                                     
_output_vexpr_ctxs[i]->root()->is_nullable());
+                fields.emplace_back(field);
+            } else {
+                std::shared_ptr<arrow::Field> field = arrow::field(
+                        _column_names[i], type, 
_output_vexpr_ctxs[i]->root()->is_nullable());
+                fields.emplace_back(field);
+            }
         }
     }
+
     if (_iceberg_schema_json != nullptr) {
         std::shared_ptr<arrow::KeyValueMetadata> schema_metadata =
                 arrow::KeyValueMetadata::Make({"iceberg.schema"}, 
{*_iceberg_schema_json});
diff --git a/be/src/vec/runtime/vparquet_transformer.h 
b/be/src/vec/runtime/vparquet_transformer.h
index 9eae25d8ac4..03c9aeb0816 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -27,6 +27,7 @@
 #include <parquet/types.h>
 #include <stdint.h>
 
+#include "vec/exec/format/table/iceberg/schema.h"
 #include "vfile_format_transformer.h"
 
 namespace doris {
@@ -94,7 +95,8 @@ public:
                         std::vector<std::string> column_names,
                         TParquetCompressionType::type compression_type,
                         bool parquet_disable_dictionary, TParquetVersion::type 
parquet_version,
-                        bool output_object_data, const std::string* 
iceberg_schema_json = nullptr);
+                        bool output_object_data, const std::string* 
iceberg_schema_json = nullptr,
+                        const iceberg::Schema* iceberg_schema = nullptr);
 
     VParquetTransformer(RuntimeState* state, doris::io::FileWriter* 
file_writer,
                         const VExprContextSPtrs& output_vexpr_ctxs,
@@ -131,6 +133,7 @@ private:
     const TParquetVersion::type _parquet_version;
     const std::string* _iceberg_schema_json;
     uint64_t _write_size = 0;
+    const iceberg::Schema* _iceberg_schema;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 924adf68145..23ee389a8b7 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -84,7 +84,7 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profil
         _file_format_transformer.reset(new VParquetTransformer(
                 state, _file_writer.get(), _write_output_expr_ctxs, 
_write_column_names,
                 parquet_compression_type, parquet_disable_dictionary, 
TParquetVersion::PARQUET_1_0,
-                false, _iceberg_schema_json));
+                false, _iceberg_schema_json, &_schema));
         return _file_format_transformer->open();
     }
     case TFileFormatType::FORMAT_ORC: {
diff --git a/be/test/vec/exec/format/table/iceberg/arrow_schema_util_test.cpp 
b/be/test/vec/exec/format/table/iceberg/arrow_schema_util_test.cpp
new file mode 100644
index 00000000000..b5f61c9d2e3
--- /dev/null
+++ b/be/test/vec/exec/format/table/iceberg/arrow_schema_util_test.cpp
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
+
+#include <arrow/api.h>
+#include <arrow/io/api.h>
+#include <arrow/status.h>
+#include <arrow/type.h>
+#include <arrow/util/key_value_metadata.h>
+#include <gtest/gtest.h>
+#include <parquet/api/reader.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/schema.h>
+
+#include "io/fs/local_file_system.h"
+#include "vec/exec/format/table/iceberg/schema.h"
+#include "vec/exec/format/table/iceberg/schema_parser.h"
+
+namespace doris {
+namespace iceberg {
+
+class ArrowSchemaUtilTest : public testing::Test {
+public:
+    ArrowSchemaUtilTest() = default;
+    virtual ~ArrowSchemaUtilTest() = default;
+};
+
+const std::string_view pfid = "PARQUET:field_id";
+
+TEST(ArrowSchemaUtilTest, test_simple_field) {
+    std::vector<NestedField> nested_fields;
+    nested_fields.reserve(2);
+    NestedField field1(false, 1, "field1", std::make_unique<IntegerType>(), 
std::nullopt);
+    NestedField field2(false, 2, "field2", std::make_unique<StringType>(), 
std::nullopt);
+    nested_fields.emplace_back(std::move(field1));
+    nested_fields.emplace_back(std::move(field2));
+
+    Schema schema(1, std::move(nested_fields));
+
+    std::vector<std::shared_ptr<arrow::Field>> fields;
+    Status st;
+    st = ArrowSchemaUtil::convert(&schema, "utc", fields);
+    EXPECT_TRUE(st.ok());
+    EXPECT_EQ(2, fields.size());
+    EXPECT_EQ("field1", fields[0]->name());
+    EXPECT_EQ("field2", fields[1]->name());
+    EXPECT_TRUE(fields[0]->HasMetadata());
+    EXPECT_TRUE(fields[1]->HasMetadata());
+    EXPECT_EQ("1", fields[0]->metadata()->Get(pfid).ValueUnsafe());
+    EXPECT_EQ("2", fields[1]->metadata()->Get(pfid).ValueUnsafe());
+}
+
+TEST(ArrowSchemaUtilTest, test_stuct_field) {
+    // struct_json comes from :
+    //     Schema schema = new Schema(
+    //     Types.NestedField.optional(
+    //         21, "st_col", Types.StructType.of(
+    //             Types.NestedField.optional(32, "st_col_c1", 
Types.IntegerType.get()),
+    //             Types.NestedField.optional(43, "st_col_c2", 
Types.StringType.get())
+    //         )
+    //     )
+    // );
+    // StringWriter writer = new StringWriter();
+    // JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+    // SchemaParser.toJson(schema.asStruct(), generator);
+    // generator.flush();
+    // System.out.println(writer.toString());
+
+    const std::string struct_json = R"({
+        "type": "struct",
+        "fields": [
+            {
+                "id": 21,
+                "name": "st_col",
+                "required": false,
+                "type": {
+                    "type": "struct",
+                    "fields": [
+                        {
+                            "id": 32,
+                            "name": "st_col_c1",
+                            "required": false,
+                            "type": "int"
+                        },
+                        {
+                            "id": 43,
+                            "name": "st_col_c2",
+                            "required": false,
+                            "type": "string"
+                        }
+                    ]
+                }
+            }
+        ]
+    })";
+    std::unique_ptr<Schema> schema = SchemaParser::from_json(struct_json);
+
+    std::vector<std::shared_ptr<arrow::Field>> fields;
+    Status st;
+    st = ArrowSchemaUtil::convert(schema.get(), "utc", fields);
+    EXPECT_TRUE(st.ok());
+    EXPECT_EQ(1, fields.size());
+    EXPECT_EQ("st_col", fields[0]->name());
+    EXPECT_EQ("21", fields[0]->metadata()->Get(pfid).ValueUnsafe());
+
+    arrow::StructType* arrow_struct = 
dynamic_cast<arrow::StructType*>(fields[0]->type().get());
+    auto map_fields = arrow_struct->fields();
+    EXPECT_EQ(2, arrow_struct->fields().size());
+    EXPECT_EQ("st_col_c1", map_fields.at(0).get()->name());
+    EXPECT_EQ("st_col_c2", map_fields.at(1).get()->name());
+    EXPECT_EQ("32", 
map_fields.at(0).get()->metadata()->Get(pfid).ValueUnsafe());
+    EXPECT_EQ("43", 
map_fields.at(1).get()->metadata()->Get(pfid).ValueUnsafe());
+}
+
+TEST(ArrowSchemaUtilTest, test_map_field) {
+    // map_json comes from :
+    // Schema schema = new Schema(
+    //     Types.NestedField.optional(
+    //         21, "map_col", Types.MapType.ofOptional(
+    //             32, 43, Types.IntegerType.get(), Types.StringType.get()
+    //         )
+    //     )
+    // );
+    // StringWriter writer = new StringWriter();
+    // JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+    // SchemaParser.toJson(schema.asStruct(), generator);
+    // generator.flush();
+    // System.out.println(writer.toString());
+
+    const std::string map_json = R"({
+        "type": "struct",
+        "fields": [
+            {
+                "id": 21,
+                "name": "map_col",
+                "required": false,
+                "type": {
+                    "type": "map",
+                    "key-id": 32,
+                    "key": "int",
+                    "value-id": 43,
+                    "value": "string",
+                    "value-required": false
+                }
+            }
+        ]
+    })";
+    std::unique_ptr<Schema> schema = SchemaParser::from_json(map_json);
+
+    std::vector<std::shared_ptr<arrow::Field>> fields;
+    Status st;
+    st = ArrowSchemaUtil::convert(schema.get(), "utc", fields);
+    EXPECT_TRUE(st.ok());
+    EXPECT_EQ(1, fields.size());
+    EXPECT_EQ("map_col", fields[0]->name());
+    EXPECT_EQ("21", fields[0]->metadata()->Get(pfid).ValueUnsafe());
+
+    arrow::MapType* arrow_map = 
dynamic_cast<arrow::MapType*>(fields[0]->type().get());
+    auto map_fields = arrow_map->fields();
+    EXPECT_EQ(1, arrow_map->fields().size());
+    EXPECT_EQ("key", arrow_map->key_field()->name());
+    EXPECT_EQ("value", arrow_map->item_field()->name());
+    EXPECT_EQ("32", 
arrow_map->key_field()->metadata()->Get(pfid).ValueUnsafe());
+    EXPECT_EQ("43", 
arrow_map->item_field()->metadata()->Get(pfid).ValueUnsafe());
+}
+
+TEST(ArrowSchemaUtilTest, test_list_field) {
+    // list_json comes from :
+    // Schema schema = new Schema(
+    //     Types.NestedField.optional(
+    //         21, "list_col", Types.ListType.ofOptional(
+    //             32, Types.IntegerType.get())));
+    // StringWriter writer = new StringWriter();
+    // JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+    // SchemaParser.toJson(schema.asStruct(), generator);
+    // generator.flush();
+    // System.out.println(writer.toString());
+
+    const std::string list_json = R"({
+        "type": "struct",
+        "fields": [
+            {
+                "id": 21,
+                "name": "list_col",
+                "required": false,
+                "type": {
+                    "type": "list",
+                    "element-id": 32,
+                    "element": "int",
+                    "element-required": false
+                }
+            }
+        ]
+    })";
+    std::unique_ptr<Schema> schema = SchemaParser::from_json(list_json);
+
+    std::vector<std::shared_ptr<arrow::Field>> fields;
+    Status st;
+    st = ArrowSchemaUtil::convert(schema.get(), "utc", fields);
+    EXPECT_TRUE(st.ok());
+    EXPECT_EQ(1, fields.size());
+    EXPECT_EQ("list_col", fields[0]->name());
+    EXPECT_EQ("21", fields[0]->metadata()->Get(pfid).ValueUnsafe());
+
+    arrow::ListType* arrow_list = 
dynamic_cast<arrow::ListType*>(fields[0]->type().get());
+    auto map_fields = arrow_list->fields();
+    EXPECT_EQ(1, arrow_list->fields().size());
+    EXPECT_EQ("element", arrow_list->value_field()->name());
+    EXPECT_EQ("32", 
arrow_list->value_field()->metadata()->Get(pfid).ValueUnsafe());
+}
+
+TEST(ArrowSchemaUtilTest, test_parquet_filed_id) {
+    std::string test_dir = "ut_dir/test_parquet_filed_id";
+    Status st;
+    st = io::global_local_filesystem()->delete_directory(test_dir);
+    ASSERT_TRUE(st.ok()) << st;
+    st = io::global_local_filesystem()->create_directory(test_dir);
+    ASSERT_TRUE(st.ok()) << st;
+
+    std::shared_ptr<arrow::Array> id_array;
+    std::shared_ptr<arrow::Array> name_array;
+
+    arrow::Int32Builder id_builder;
+    ASSERT_TRUE(id_builder.Append(1).ok());
+    ASSERT_TRUE(id_builder.Append(2).ok());
+    ASSERT_TRUE(id_builder.Append(3).ok());
+    auto&& result_id = id_builder.Finish();
+    ASSERT_TRUE(result_id.ok());
+    id_array = std::move(result_id).ValueUnsafe();
+
+    arrow::StringBuilder name_builder;
+    ASSERT_TRUE(name_builder.Append("Alice").ok());
+    ASSERT_TRUE(name_builder.Append("Bob").ok());
+    ASSERT_TRUE(name_builder.Append("Charlie").ok());
+    auto&& result_name = name_builder.Finish();
+    ASSERT_TRUE(result_name.ok());
+    name_array = std::move(result_name).ValueUnsafe();
+
+    // 定义表的 Schema
+    std::vector<NestedField> nested_fields;
+    nested_fields.reserve(2);
+    NestedField field1(false, 17, "field_1", std::make_unique<IntegerType>(), 
std::nullopt);
+    NestedField field2(false, 36, "field_2", std::make_unique<StringType>(), 
std::nullopt);
+    nested_fields.emplace_back(std::move(field1));
+    nested_fields.emplace_back(std::move(field2));
+
+    Schema schema(1, std::move(nested_fields));
+
+    std::vector<std::shared_ptr<arrow::Field>> fields;
+    st = ArrowSchemaUtil::convert(&schema, "utc", fields);
+    auto arrow_schema = arrow::schema(fields);
+
+    // create arrow table
+    auto table = arrow::Table::Make(arrow_schema, {id_array, name_array});
+
+    std::string file_path = test_dir + "/f1.parquet";
+    std::shared_ptr<arrow::io::FileOutputStream> outfile;
+    auto&& result_file = arrow::io::FileOutputStream::Open(file_path);
+    ASSERT_TRUE(result_file.ok());
+    outfile = std::move(result_file).ValueUnsafe();
+
+    // arrow table to parquet file
+    PARQUET_THROW_NOT_OK(
+            parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), 
outfile, 1024));
+
+    // open parquet with parquet's API
+    std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+            parquet::ParquetFileReader::OpenFile(file_path, false);
+
+    // get MessageType
+    std::shared_ptr<parquet::FileMetaData> file_metadata = 
parquet_reader->metadata();
+    auto schema_descriptor = file_metadata->schema();
+    const parquet::schema::Node& root = *schema_descriptor->group_node();
+    const auto& group_node = static_cast<const 
parquet::schema::GroupNode&>(root);
+
+    EXPECT_EQ(2, group_node.field_count());
+    auto filed1 = group_node.field(0);
+    auto filed2 = group_node.field(1);
+    EXPECT_EQ("field_1", filed1->name());
+    EXPECT_EQ(17, filed1->field_id());
+    EXPECT_EQ("field_2", filed2->name());
+    EXPECT_EQ(36, filed2->field_id());
+
+    st = io::global_local_filesystem()->delete_directory(test_dir);
+    EXPECT_TRUE(st.ok()) << st;
+}
+
+} // namespace iceberg
+} // namespace doris
diff --git a/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp 
b/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp
index 4c3f58cdd10..f464525a7f9 100644
--- a/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp
+++ b/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp
@@ -78,6 +78,15 @@ const std::string valid_map_json = R"({
     "value-required": true
 })";
 
+const std::string valid_map_json2 = R"({
+    "type": "map",
+    "key-id": 4,
+    "key": "string",
+    "value-id": 5,
+    "value": "int",
+    "value-required": false
+})";
+
 const std::string nested_list_json = R"({
     "type": "list",
     "element-id": 6,
@@ -209,6 +218,21 @@ TEST(SchemaParserTest, parse_valid_map) {
             
SchemaParser::_type_from_json(rapidjson::Document().Parse(valid_map_json.c_str()));
     ASSERT_NE(type, nullptr);
     EXPECT_EQ(type->to_string(), "map<string, int>");
+    EXPECT_TRUE(type->is_map_type());
+    MapType* mt = type->as_map_type();
+    EXPECT_TRUE(mt->field(4)->is_required());
+    EXPECT_TRUE(mt->field(5)->is_required());
+}
+
+TEST(SchemaParserTest, parse_valid_map2) {
+    std::unique_ptr<Type> type =
+            
SchemaParser::_type_from_json(rapidjson::Document().Parse(valid_map_json2.c_str()));
+    ASSERT_NE(type, nullptr);
+    EXPECT_EQ(type->to_string(), "map<string, int>");
+    EXPECT_TRUE(type->is_map_type());
+    MapType* mt = type->as_map_type();
+    EXPECT_TRUE(mt->field(4)->is_required());
+    EXPECT_TRUE(mt->field(5)->is_optional());
 }
 
 TEST(SchemaParserTest, parse_nested_list) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to