This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 667f5e6e6a8 [feat](iceberg)Supports using rest type catalog to read 
tables in unity catalog for 2.1 (#43525) (#45217)
667f5e6e6a8 is described below

commit 667f5e6e6a8700c2313d4ae19139c5d74b6393c5
Author: wuwenchi <wuwen...@selectdb.com>
AuthorDate: Thu Dec 12 16:49:36 2024 +0800

    [feat](iceberg)Supports using rest type catalog to read tables in unity 
catalog for 2.1 (#43525) (#45217)
    
    bp: #43525
---
 be/src/io/fs/file_system.cpp                       | 34 ++++++---
 be/src/io/fs/file_system.h                         |  8 ++-
 be/src/io/fs/local_file_system.cpp                 | 82 +++++++++++++++++++---
 be/src/io/fs/local_file_system.h                   |  6 ++
 be/src/io/fs/remote_file_system.cpp                | 10 ++-
 be/src/io/fs/s3_file_system.h                      |  7 +-
 be/src/vec/exec/format/parquet/schema_desc.cpp     | 18 +++++
 be/src/vec/exec/format/parquet/schema_desc.h       |  7 ++
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  6 +-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  2 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 48 ++++---------
 be/src/vec/exec/format/table/iceberg_reader.h      |  2 +-
 be/test/io/fs/local_file_system_test.cpp           | 51 ++++++++++++++
 .../apache/doris/datasource/ExternalCatalog.java   |  6 ++
 .../doris/datasource/hive/HMSExternalCatalog.java  | 20 ++++--
 .../datasource/iceberg/IcebergExternalCatalog.java |  3 +-
 .../datasource/iceberg/IcebergMetadataCache.java   | 14 ++--
 .../datasource/iceberg/IcebergMetadataOps.java     | 69 +++++++++++++-----
 .../iceberg/IcebergRestExternalCatalog.java        |  2 -
 .../datasource/iceberg/IcebergTransaction.java     |  3 +-
 .../operations/ExternalMetadataOperations.java     |  4 +-
 .../datasource/operations/ExternalMetadataOps.java |  4 ++
 .../iceberg/iceberg_read_unitycatalog_table.out    | 40 +++++++++++
 .../iceberg/iceberg_read_unitycatalog_table.groovy | 62 ++++++++++++++++
 24 files changed, 398 insertions(+), 110 deletions(-)

diff --git a/be/src/io/fs/file_system.cpp b/be/src/io/fs/file_system.cpp
index 3579a5323d9..e6b5ef7df1a 100644
--- a/be/src/io/fs/file_system.cpp
+++ b/be/src/io/fs/file_system.cpp
@@ -25,58 +25,70 @@ namespace io {
 
 Status FileSystem::create_file(const Path& file, FileWriterPtr* writer,
                                const FileWriterOptions* opts) {
-    auto path = absolute_path(file);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(file, path));
     FILESYSTEM_M(create_file_impl(path, writer, opts));
 }
 
 Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader,
                              const FileReaderOptions* opts) {
-    auto path = absolute_path(file);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(file, path));
     FILESYSTEM_M(open_file_impl(path, reader, opts));
 }
 
 Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) {
-    auto path = absolute_path(dir);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(dir, path));
     FILESYSTEM_M(create_directory_impl(path, failed_if_exists));
 }
 
 Status FileSystem::delete_file(const Path& file) {
-    auto path = absolute_path(file);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(file, path));
     FILESYSTEM_M(delete_file_impl(path));
 }
 
 Status FileSystem::delete_directory(const Path& dir) {
-    auto path = absolute_path(dir);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(dir, path));
     FILESYSTEM_M(delete_directory_impl(path));
 }
 
 Status FileSystem::batch_delete(const std::vector<Path>& files) {
     std::vector<Path> abs_files;
     for (auto& file : files) {
-        abs_files.push_back(absolute_path(file));
+        Path abs_file;
+        RETURN_IF_ERROR(absolute_path(file, abs_file));
+        abs_files.push_back(abs_file);
     }
     FILESYSTEM_M(batch_delete_impl(abs_files));
 }
 
 Status FileSystem::exists(const Path& path, bool* res) const {
-    auto fs_path = absolute_path(path);
+    Path fs_path;
+    RETURN_IF_ERROR(absolute_path(path, fs_path));
     FILESYSTEM_M(exists_impl(fs_path, res));
 }
 
 Status FileSystem::file_size(const Path& file, int64_t* file_size) const {
-    auto path = absolute_path(file);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(file, path));
     FILESYSTEM_M(file_size_impl(path, file_size));
 }
 
 Status FileSystem::list(const Path& dir, bool only_file, 
std::vector<FileInfo>* files,
                         bool* exists) {
-    auto path = absolute_path(dir);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(dir, path));
     FILESYSTEM_M(list_impl(path, only_file, files, exists));
 }
 
 Status FileSystem::rename(const Path& orig_name, const Path& new_name) {
-    auto orig_path = absolute_path(orig_name);
-    auto new_path = absolute_path(new_name);
+    Path orig_path;
+    RETURN_IF_ERROR(absolute_path(orig_name, orig_path));
+    Path new_path;
+    RETURN_IF_ERROR(absolute_path(new_name, new_path));
     FILESYSTEM_M(rename_impl(orig_path, new_path));
 }
 
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index a8cdf5f4eb6..dd6a63222a5 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -144,11 +144,13 @@ protected:
     /// rename file from orig_name to new_name
     virtual Status rename_impl(const Path& orig_name, const Path& new_name) = 
0;
 
-    virtual Path absolute_path(const Path& path) const {
+    virtual Status absolute_path(const Path& path, Path& abs_path) const {
         if (path.is_absolute()) {
-            return path;
+            abs_path = path;
+        } else {
+            abs_path = _root_path / path;
         }
-        return _root_path / path;
+        return Status::OK();
     }
 
     FileSystem(Path&& root_path, std::string&& id, FileSystemType type)
diff --git a/be/src/io/fs/local_file_system.cpp 
b/be/src/io/fs/local_file_system.cpp
index 2c4b6bb5f82..9370c4cae5b 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -146,7 +146,8 @@ Status LocalFileSystem::delete_directory_impl(const Path& 
dir) {
 }
 
 Status LocalFileSystem::delete_directory_or_file(const Path& path) {
-    auto the_path = absolute_path(path);
+    Path the_path;
+    RETURN_IF_ERROR(absolute_path(path, the_path));
     FILESYSTEM_M(delete_directory_or_file_impl(the_path));
 }
 
@@ -248,8 +249,10 @@ Status LocalFileSystem::rename_impl(const Path& orig_name, 
const Path& new_name)
 }
 
 Status LocalFileSystem::link_file(const Path& src, const Path& dest) {
-    auto src_file = absolute_path(src);
-    auto dest_file = absolute_path(dest);
+    Path src_file;
+    RETURN_IF_ERROR(absolute_path(src, src_file));
+    Path dest_file;
+    RETURN_IF_ERROR(absolute_path(dest, dest_file));
     FILESYSTEM_M(link_file_impl(src_file, dest_file));
 }
 
@@ -272,7 +275,8 @@ Status LocalFileSystem::canonicalize(const Path& path, 
std::string* real_path) {
 }
 
 Status LocalFileSystem::is_directory(const Path& path, bool* res) {
-    auto tmp_path = absolute_path(path);
+    Path tmp_path;
+    RETURN_IF_ERROR(absolute_path(path, tmp_path));
     std::error_code ec;
     *res = std::filesystem::is_directory(tmp_path, ec);
     if (ec) {
@@ -282,7 +286,8 @@ Status LocalFileSystem::is_directory(const Path& path, 
bool* res) {
 }
 
 Status LocalFileSystem::md5sum(const Path& file, std::string* md5sum) {
-    auto path = absolute_path(file);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(file, path));
     FILESYSTEM_M(md5sum_impl(path, md5sum));
 }
 
@@ -318,8 +323,9 @@ Status LocalFileSystem::md5sum_impl(const Path& file, 
std::string* md5sum) {
 
 Status LocalFileSystem::iterate_directory(const std::string& dir,
                                           const std::function<bool(const 
FileInfo& file)>& cb) {
-    auto path = absolute_path(dir);
-    FILESYSTEM_M(iterate_directory_impl(dir, cb));
+    Path path;
+    RETURN_IF_ERROR(absolute_path(dir, path));
+    FILESYSTEM_M(iterate_directory_impl(path, cb));
 }
 
 Status LocalFileSystem::iterate_directory_impl(
@@ -336,7 +342,8 @@ Status LocalFileSystem::iterate_directory_impl(
 }
 
 Status LocalFileSystem::get_space_info(const Path& dir, size_t* capacity, 
size_t* available) {
-    auto path = absolute_path(dir);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(dir, path));
     FILESYSTEM_M(get_space_info_impl(path, capacity, available));
 }
 
@@ -353,8 +360,10 @@ Status LocalFileSystem::get_space_info_impl(const Path& 
path, size_t* capacity,
 }
 
 Status LocalFileSystem::copy_path(const Path& src, const Path& dest) {
-    auto src_path = absolute_path(src);
-    auto dest_path = absolute_path(dest);
+    Path src_path;
+    RETURN_IF_ERROR(absolute_path(src, src_path));
+    Path dest_path;
+    RETURN_IF_ERROR(absolute_path(dest, dest_path));
     FILESYSTEM_M(copy_path_impl(src_path, dest_path));
 }
 
@@ -455,7 +464,8 @@ Status LocalFileSystem::_glob(const std::string& pattern, 
std::vector<std::strin
 }
 
 Status LocalFileSystem::permission(const Path& file, std::filesystem::perms 
prms) {
-    auto path = absolute_path(file);
+    Path path;
+    RETURN_IF_ERROR(absolute_path(file, path));
     FILESYSTEM_M(permission_impl(path, prms));
 }
 
@@ -468,5 +478,55 @@ Status LocalFileSystem::permission_impl(const Path& file, 
std::filesystem::perms
     return Status::OK();
 }
 
+Status LocalFileSystem::convert_to_abs_path(const Path& input_path_str, Path& 
abs_path) {
+    // valid path include:
+    //   1. abc/def                         will return abc/def
+    //   2. /abc/def                        will return /abc/def
+    //   3. file:/abc/def                   will return /abc/def
+    //   4. file://<authority>/abc/def      will return /abc/def
+    std::string path_str = input_path_str;
+    size_t slash = path_str.find('/');
+    if (slash == 0) {
+        abs_path = input_path_str;
+        return Status::OK();
+    }
+
+    // Initialize scheme and authority
+    std::string scheme;
+    size_t start = 0;
+
+    // Parse URI scheme
+    size_t colon = path_str.find(':');
+    if (colon != std::string::npos && (slash == std::string::npos || colon < 
slash)) {
+        // Has a scheme
+        scheme = path_str.substr(0, colon);
+        if (scheme != "file") {
+            return Status::InternalError(
+                    "Only supports `file` type scheme, like 'file:///path', 
'file:/path'.");
+        }
+        start = colon + 1;
+    }
+
+    // Parse URI authority, if any
+    if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start > 
2) {
+        // Has authority
+        // such as : path_str = "file://authority/abc/def"
+        // and now : start = 5
+        size_t next_slash = path_str.find('/', start + 2);
+        // now : next_slash = 16
+        if (next_slash == std::string::npos) {
+            return Status::InternalError(
+                    "This input string only has authority, but has no path 
information");
+        }
+        // We will skit authority
+        // now : start = 16
+        start = next_slash;
+    }
+
+    // URI path is the rest of the string
+    abs_path = path_str.substr(start);
+    return Status::OK();
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index 8578b9f5ac2..d765f2daeec 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -24,6 +24,7 @@
 #include <string>
 #include <vector>
 
+#include "common/exception.h"
 #include "common/status.h"
 #include "io/fs/file_system.h"
 #include "io/fs/path.h"
@@ -33,6 +34,7 @@ namespace doris::io {
 class LocalFileSystem final : public FileSystem {
 public:
     static std::shared_ptr<LocalFileSystem> create(Path path, std::string id = 
"");
+    static Status convert_to_abs_path(const Path& path, Path& abs_path);
     ~LocalFileSystem() override;
 
     /// hard link dest file to src file
@@ -98,6 +100,10 @@ protected:
     Status copy_path_impl(const Path& src, const Path& dest);
     Status permission_impl(const Path& file, std::filesystem::perms prms);
 
+    Status absolute_path(const Path& path, Path& abs_path) const override {
+        return convert_to_abs_path(path, abs_path);
+    }
+
 private:
     // a wrapper for glob(), return file list in "res"
     Status _glob(const std::string& pattern, std::vector<std::string>* res);
diff --git a/be/src/io/fs/remote_file_system.cpp 
b/be/src/io/fs/remote_file_system.cpp
index dc4830be41e..dcde80b9d21 100644
--- a/be/src/io/fs/remote_file_system.cpp
+++ b/be/src/io/fs/remote_file_system.cpp
@@ -30,7 +30,8 @@ namespace doris {
 namespace io {
 
 Status RemoteFileSystem::upload(const Path& local_file, const Path& dest_file) 
{
-    auto dest_path = absolute_path(dest_file);
+    Path dest_path;
+    RETURN_IF_ERROR(absolute_path(dest_file, dest_path));
     FILESYSTEM_M(upload_impl(local_file, dest_path));
 }
 
@@ -38,13 +39,16 @@ Status RemoteFileSystem::batch_upload(const 
std::vector<Path>& local_files,
                                       const std::vector<Path>& remote_files) {
     std::vector<Path> remote_paths;
     for (auto& path : remote_files) {
-        remote_paths.push_back(absolute_path(path));
+        Path abs_path;
+        RETURN_IF_ERROR(absolute_path(path, abs_path));
+        remote_paths.push_back(abs_path);
     }
     FILESYSTEM_M(batch_upload_impl(local_files, remote_paths));
 }
 
 Status RemoteFileSystem::download(const Path& remote_file, const Path& local) {
-    auto remote_path = absolute_path(remote_file);
+    Path remote_path;
+    RETURN_IF_ERROR(absolute_path(remote_file, remote_path));
     FILESYSTEM_M(download_impl(remote_path, local));
 }
 
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index 26bf8186a93..2346d1b2f43 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -91,16 +91,17 @@ protected:
                              const std::vector<Path>& remote_files) override;
     Status download_impl(const Path& remote_file, const Path& local_file) 
override;
 
-    Path absolute_path(const Path& path) const override {
+    Status absolute_path(const Path& path, Path& abs_path) const override {
         if (path.string().find("://") != std::string::npos) {
             // the path is with schema, which means this is a full path like:
             // s3://bucket/path/to/file.txt
             // so no need to concat with prefix
-            return path;
+            abs_path = path;
         } else {
             // path with no schema
-            return _root_path / path;
+            abs_path = _root_path / path;
         }
+        return Status::OK();
     }
 
 private:
diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp 
b/be/src/vec/exec/format/parquet/schema_desc.cpp
index 9097b65718f..410e5eb7a1f 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -137,6 +137,9 @@ Status FieldDescriptor::parse_from_thrift(const 
std::vector<tparquet::SchemaElem
             return Status::InvalidArgument("Duplicated field name: {}", 
_fields[i].name);
         }
         _name_to_field.emplace(_fields[i].name, &_fields[i]);
+        if (_fields[i].field_id != -1) {
+            _field_id_name_mapping.emplace(_fields[i].field_id, 
_fields[i].name);
+        }
     }
 
     if (_next_schema_pos != t_schemas.size()) {
@@ -147,6 +150,14 @@ Status FieldDescriptor::parse_from_thrift(const 
std::vector<tparquet::SchemaElem
     return Status::OK();
 }
 
+const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) 
const {
+    auto const it = _field_id_name_mapping.find(id);
+    if (it == _field_id_name_mapping.end()) {
+        return {};
+    }
+    return {it->second.data()};
+}
+
 Status FieldDescriptor::parse_node_field(const 
std::vector<tparquet::SchemaElement>& t_schemas,
                                          size_t curr_pos, FieldSchema* 
node_field) {
     if (curr_pos >= t_schemas.size()) {
@@ -172,6 +183,7 @@ Status FieldDescriptor::parse_node_field(const 
std::vector<tparquet::SchemaEleme
         node_field->type.add_sub_type(child->type);
         node_field->is_nullable = false;
         _next_schema_pos = curr_pos + 1;
+        node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : 
-1;
     } else {
         bool is_optional = is_optional_node(t_schema);
         if (is_optional) {
@@ -194,6 +206,7 @@ void FieldDescriptor::parse_physical_field(const 
tparquet::SchemaElement& physic
     auto type = get_doris_type(physical_schema);
     physical_field->type = type.first;
     physical_field->is_type_compatibility = type.second;
+    physical_field->field_id = physical_schema.__isset.field_id ? 
physical_schema.field_id : -1;
 }
 
 std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
@@ -465,6 +478,7 @@ Status FieldDescriptor::parse_group_field(const 
std::vector<tparquet::SchemaElem
         group_field->type.type = TYPE_ARRAY;
         group_field->type.add_sub_type(struct_field->type);
         group_field->is_nullable = false;
+        group_field->field_id = group_schema.__isset.field_id ? 
group_schema.field_id : -1;
     } else {
         RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
     }
@@ -533,6 +547,7 @@ Status FieldDescriptor::parse_list_field(const 
std::vector<tparquet::SchemaEleme
     list_field->type.type = TYPE_ARRAY;
     list_field->type.add_sub_type(list_field->children[0].type);
     list_field->is_nullable = is_optional;
+    list_field->field_id = first_level.__isset.field_id ? first_level.field_id 
: -1;
 
     return Status::OK();
 }
@@ -597,6 +612,7 @@ Status FieldDescriptor::parse_map_field(const 
std::vector<tparquet::SchemaElemen
     map_field->type.add_sub_type(map_kv_field->type.children[0]);
     map_field->type.add_sub_type(map_kv_field->type.children[1]);
     map_field->is_nullable = is_optional;
+    map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : 
-1;
 
     return Status::OK();
 }
@@ -619,6 +635,8 @@ Status FieldDescriptor::parse_struct_field(const 
std::vector<tparquet::SchemaEle
     struct_field->name = to_lower(struct_schema.name);
     struct_field->is_nullable = is_optional;
     struct_field->type.type = TYPE_STRUCT;
+    struct_field->field_id = struct_schema.__isset.field_id ? 
struct_schema.field_id : -1;
+
     for (int i = 0; i < num_children; ++i) {
         struct_field->type.add_sub_type(struct_field->children[i].type,
                                         struct_field->children[i].name);
diff --git a/be/src/vec/exec/format/parquet/schema_desc.h 
b/be/src/vec/exec/format/parquet/schema_desc.h
index ca726ef1b57..3a139e3c456 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.h
+++ b/be/src/vec/exec/format/parquet/schema_desc.h
@@ -28,6 +28,7 @@
 
 #include "common/status.h"
 #include "runtime/types.h"
+#include "util/slice.h"
 
 namespace doris::vectorized {
 
@@ -56,6 +57,7 @@ struct FieldSchema {
     ~FieldSchema() = default;
     FieldSchema(const FieldSchema& fieldSchema) = default;
     std::string debug_string() const;
+    int32_t field_id;
 };
 
 class FieldDescriptor {
@@ -68,6 +70,7 @@ private:
     std::unordered_map<std::string, const FieldSchema*> _name_to_field;
     // Used in from_thrift, marking the next schema position that should be 
parsed
     size_t _next_schema_pos;
+    std::unordered_map<int, std::string> _field_id_name_mapping;
 
     void parse_physical_field(const tparquet::SchemaElement& physical_schema, 
bool is_nullable,
                               FieldSchema* physical_field);
@@ -128,6 +131,10 @@ public:
     std::string debug_string() const;
 
     int32_t size() const { return _fields.size(); }
+
+    bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 
0; }
+
+    const doris::Slice get_column_name_from_field_id(int32_t id) const;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 32391d1f0da..0f204172fc6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -253,10 +253,8 @@ Status ParquetReader::_open_file() {
     return Status::OK();
 }
 
-// Get iceberg col id to col name map stored in parquet metadata key values.
-// This is for iceberg schema evolution.
-std::vector<tparquet::KeyValue> ParquetReader::get_metadata_key_values() {
-    return _t_metadata->key_value_metadata;
+const FieldDescriptor ParquetReader::get_file_metadata_schema() {
+    return _file_metadata->schema();
 }
 
 Status ParquetReader::open() {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 1d70f9ab5d5..1928ebe6aa3 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -148,7 +148,7 @@ public:
                     partition_columns,
             const std::unordered_map<std::string, VExprContextSPtr>& 
missing_columns) override;
 
-    std::vector<tparquet::KeyValue> get_metadata_key_values();
+    const FieldDescriptor get_file_metadata_schema();
     void set_table_to_file_col_map(std::unordered_map<std::string, 
std::string>& map) {
         _table_col_to_file_col = map;
     }
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 295a3a40544..8f130ca6002 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -53,6 +53,7 @@
 #include "vec/exec/format/format_common.h"
 #include "vec/exec/format/generic_reader.h"
 #include "vec/exec/format/orc/vorc_reader.h"
+#include "vec/exec/format/parquet/schema_desc.h"
 #include "vec/exec/format/table/table_format_reader.h"
 
 namespace cctz {
@@ -546,8 +547,8 @@ Status IcebergParquetReader::init_reader(
     _col_id_name_map = col_id_name_map;
     _file_col_names = file_col_names;
     _colname_to_value_range = colname_to_value_range;
-    auto parquet_meta_kv = parquet_reader->get_metadata_key_values();
-    RETURN_IF_ERROR(_gen_col_name_maps(parquet_meta_kv));
+    FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
+    RETURN_IF_ERROR(_gen_col_name_maps(field_desc));
     _gen_file_col_names();
     _gen_new_colname_to_value_range();
     parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
@@ -672,39 +673,20 @@ Status IcebergOrcReader::_read_position_delete_file(const 
TFileRangeDesc* delete
  * 1. col1_new -> col1
  * 2. col1 -> col1_new
  */
-Status 
IcebergParquetReader::_gen_col_name_maps(std::vector<tparquet::KeyValue> 
parquet_meta_kv) {
-    for (int i = 0; i < parquet_meta_kv.size(); ++i) {
-        tparquet::KeyValue kv = parquet_meta_kv[i];
-        if (kv.key == "iceberg.schema") {
-            _has_iceberg_schema = true;
-            std::string schema = kv.value;
-            rapidjson::Document json;
-            json.Parse(schema.c_str());
-
-            if (json.HasMember("fields")) {
-                rapidjson::Value& fields = json["fields"];
-                if (fields.IsArray()) {
-                    for (int j = 0; j < fields.Size(); j++) {
-                        rapidjson::Value& e = fields[j];
-                        rapidjson::Value& id = e["id"];
-                        rapidjson::Value& name = e["name"];
-                        std::string name_string = name.GetString();
-                        transform(name_string.begin(), name_string.end(), 
name_string.begin(),
-                                  ::tolower);
-                        auto iter = _col_id_name_map.find(id.GetInt());
-                        if (iter != _col_id_name_map.end()) {
-                            _table_col_to_file_col.emplace(iter->second, 
name_string);
-                            _file_col_to_table_col.emplace(name_string, 
iter->second);
-                            if (name_string != iter->second) {
-                                _has_schema_change = true;
-                            }
-                        } else {
-                            _has_schema_change = true;
-                        }
-                    }
+Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor& 
field_desc) {
+    if (field_desc.has_parquet_field_id()) {
+        for (const auto& pair : _col_id_name_map) {
+            auto name_slice = 
field_desc.get_column_name_from_field_id(pair.first);
+            if (name_slice.get_size() == 0) {
+                _has_schema_change = true;
+            } else {
+                auto name_string = name_slice.to_string();
+                _table_col_to_file_col.emplace(pair.second, name_string);
+                _file_col_to_table_col.emplace(name_string, pair.second);
+                if (name_string != pair.second) {
+                    _has_schema_change = true;
                 }
             }
-            break;
         }
     }
     return Status::OK();
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index 04f64aad518..2e240f465b6 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -218,7 +218,7 @@ public:
         parquet_reader->set_delete_rows(&_iceberg_delete_rows);
     }
 
-    Status _gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv);
+    Status _gen_col_name_maps(const FieldDescriptor& field_desc);
 
 protected:
     std::unique_ptr<GenericReader> _create_equality_reader(
diff --git a/be/test/io/fs/local_file_system_test.cpp 
b/be/test/io/fs/local_file_system_test.cpp
index 5cfdb9ac73b..e15ba34a254 100644
--- a/be/test/io/fs/local_file_system_test.cpp
+++ b/be/test/io/fs/local_file_system_test.cpp
@@ -407,4 +407,55 @@ TEST_F(LocalFileSystemTest, TestGlob) {
     EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok());
 }
 
+TEST_F(LocalFileSystemTest, TestConvertToAbsPath) {
+    io::Path abs_path;
+    Status st;
+
+    // suppurt path:
+    st = doris::io::LocalFileSystem::convert_to_abs_path("/abc/def", abs_path);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ("/abc/def", abs_path);
+
+    st = doris::io::LocalFileSystem::convert_to_abs_path("file:/def/hij", 
abs_path);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ("/def/hij", abs_path);
+
+    st = 
doris::io::LocalFileSystem::convert_to_abs_path("file://host:80/hij/abc", 
abs_path);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ("/hij/abc", abs_path);
+
+    st = 
doris::io::LocalFileSystem::convert_to_abs_path("file://host/abc/def", 
abs_path);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ("/abc/def", abs_path);
+
+    st = doris::io::LocalFileSystem::convert_to_abs_path("file:///def", 
abs_path);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ("/def", abs_path);
+
+    st = doris::io::LocalFileSystem::convert_to_abs_path("file:///", abs_path);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ("/", abs_path);
+
+    st = doris::io::LocalFileSystem::convert_to_abs_path("file://auth/", 
abs_path);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ("/", abs_path);
+
+    st = doris::io::LocalFileSystem::convert_to_abs_path("abc", abs_path);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ("abc", abs_path);
+
+    // not support path:
+    st = doris::io::LocalFileSystem::convert_to_abs_path("file://auth", 
abs_path);
+    ASSERT_TRUE(!st.ok());
+
+    st = doris::io::LocalFileSystem::convert_to_abs_path("fileee:/abc", 
abs_path);
+    ASSERT_TRUE(!st.ok());
+
+    st = doris::io::LocalFileSystem::convert_to_abs_path("hdfs:///abc", 
abs_path);
+    ASSERT_TRUE(!st.ok());
+
+    st = doris::io::LocalFileSystem::convert_to_abs_path("hdfs:/abc", 
abs_path);
+    ASSERT_TRUE(!st.ok());
+}
+
 } // namespace doris
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 5a05baf3336..ddbf7e5e4e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.es.EsExternalDatabase;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
@@ -143,6 +144,7 @@ public abstract class ExternalCatalog
 
     protected Optional<Boolean> useMetaCache = Optional.empty();
     protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
+    protected PreExecutionAuthenticator preExecutionAuthenticator;
 
     public ExternalCatalog() {
     }
@@ -913,4 +915,8 @@ public abstract class ExternalCatalog
     public String getQualifiedName(String dbName) {
         return String.join(".", name, dbName);
     }
+
+    public PreExecutionAuthenticator getPreExecutionAuthenticator() {
+        return preExecutionAuthenticator;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 20b9482041d..85b999f1111 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.security.authentication.HadoopAuthenticator;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.ExternalCatalog;
@@ -34,6 +35,7 @@ import org.apache.doris.datasource.ExternalDatabase;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.operations.ExternalMetadataOperations;
@@ -88,7 +90,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
     private boolean enableHmsEventsIncrementalSync = false;
 
     //for "type" = "hms" , but is iceberg table.
-    private HiveCatalog icebergHiveCatalog;
+    private IcebergMetadataOps icebergMetadataOps;
 
     @VisibleForTesting
     public HMSExternalCatalog() {
@@ -168,6 +170,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
 
     @Override
     protected void initLocalObjectsImpl() {
+        preExecutionAuthenticator = new PreExecutionAuthenticator();
         if (authenticator == null) {
             AuthenticationConfig config = 
AuthenticationConfig.getKerberosConfig(getConfiguration());
             authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
@@ -199,8 +202,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
         transactionManager = 
TransactionManagerFactory.createHiveTransactionManager(hiveOps, 
fileSystemProvider,
                 fileSystemExecutor);
         metadataOps = hiveOps;
-
-        icebergHiveCatalog = IcebergUtils.createIcebergHiveCatalog(this, 
getName());
     }
 
     @Override
@@ -337,10 +338,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
         return enableHmsEventsIncrementalSync;
     }
 
-    public HiveCatalog getIcebergHiveCatalog() {
-        return icebergHiveCatalog;
-    }
-
     /**
      * Enum for meta tables in hive catalog.
      * eg: tbl$partitions
@@ -393,5 +390,14 @@ public class HMSExternalCatalog extends ExternalCatalog {
             }
         }
     }
+
+    public IcebergMetadataOps getIcebergMetadataOps() {
+        makeSureInitialized();
+        if (icebergMetadataOps == null) {
+            HiveCatalog icebergHiveCatalog = 
IcebergUtils.createIcebergHiveCatalog(this, getName());
+            icebergMetadataOps = 
ExternalMetadataOperations.newIcebergMetadataOps(this, icebergHiveCatalog);
+        }
+        return icebergMetadataOps;
+    }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index d8dfd1c128f..0fa69825a01 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -40,11 +40,10 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String ICEBERG_HADOOP = "hadoop";
     public static final String ICEBERG_GLUE = "glue";
     public static final String ICEBERG_DLF = "dlf";
+    public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
     protected String icebergCatalogType;
     protected Catalog catalog;
 
-    protected PreExecutionAuthenticator preExecutionAuthenticator;
-
     public IcebergExternalCatalog(long catalogId, String name, String comment) 
{
         super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index c1ac2a79754..ad347ca78f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -36,8 +36,6 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.TableIdentifier;
 import org.jetbrains.annotations.NotNull;
 
 import java.util.HashMap;
@@ -104,18 +102,16 @@ public class IcebergMetadataCache {
 
     @NotNull
     private Table loadTable(IcebergMetadataCacheKey key) {
-        Catalog icebergCatalog;
+        IcebergMetadataOps ops;
         if (key.catalog instanceof HMSExternalCatalog) {
-            icebergCatalog = ((HMSExternalCatalog) 
key.catalog).getIcebergHiveCatalog();
+            ops = ((HMSExternalCatalog) key.catalog).getIcebergMetadataOps();
         } else if (key.catalog instanceof IcebergExternalCatalog) {
-            icebergCatalog = ((IcebergExternalCatalog) 
key.catalog).getCatalog();
+            ops = (IcebergMetadataOps) (((IcebergExternalCatalog) 
key.catalog).getMetadataOps());
         } else {
             throw new RuntimeException("Only support 'hms' and 'iceberg' type 
for iceberg table");
         }
-        Table icebergTable = 
HiveMetaStoreClientHelper.ugiDoAs(((ExternalCatalog) 
key.catalog).getConfiguration(),
-                () -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName, 
key.tableName)));
-        initIcebergTableFileIO(icebergTable, key.catalog.getProperties());
-        return icebergTable;
+        return HiveMetaStoreClientHelper.ugiDoAs(((ExternalCatalog) 
key.catalog).getConfiguration(),
+            () -> ops.loadTable(key.dbName, key.tableName));
     }
 
     public void invalidateCatalogCache(long catalogId) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 500f9728961..c2070f4bad4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -36,6 +36,7 @@ import 
org.apache.doris.datasource.operations.ExternalMetadataOps;
 
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -46,29 +47,39 @@ import org.apache.logging.log4j.Logger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class IcebergMetadataOps implements ExternalMetadataOps {
 
     private static final Logger LOG = 
LogManager.getLogger(IcebergMetadataOps.class);
     protected Catalog catalog;
-    protected IcebergExternalCatalog dorisCatalog;
+    protected ExternalCatalog dorisCatalog;
     protected SupportsNamespaces nsCatalog;
     private PreExecutionAuthenticator preExecutionAuthenticator;
+    // Generally, there should be only two levels under the catalog, namely 
<database>.<table>,
+    // but the REST type catalog is obtained from an external server,
+    // and the level provided by the external server may be three levels, 
<catalog>.<database>.<table>.
+    // Therefore, if the external server provides a catalog,
+    // the catalog needs to be recorded here to ensure semantic consistency.
+    private Optional<String> externalCatalogName = Optional.empty();
 
-    public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog 
catalog) {
+    public IcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) {
         this.dorisCatalog = dorisCatalog;
         this.catalog = catalog;
         nsCatalog = (SupportsNamespaces) catalog;
-        this.preExecutionAuthenticator = 
dorisCatalog.preExecutionAuthenticator;
-
+        this.preExecutionAuthenticator = 
dorisCatalog.getPreExecutionAuthenticator();
+        if 
(dorisCatalog.getProperties().containsKey(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME))
 {
+            externalCatalogName =
+                
Optional.of(dorisCatalog.getProperties().get(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME));
+        }
     }
 
     public Catalog getCatalog() {
         return catalog;
     }
 
-    public IcebergExternalCatalog getExternalCatalog() {
+    public ExternalCatalog getExternalCatalog() {
         return dorisCatalog;
     }
 
@@ -78,17 +89,18 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public boolean tableExist(String dbName, String tblName) {
-        return catalog.tableExists(TableIdentifier.of(dbName, tblName));
+        return catalog.tableExists(getTableIdentifier(dbName, tblName));
     }
 
     public boolean databaseExist(String dbName) {
-        return nsCatalog.namespaceExists(Namespace.of(dbName));
+        return nsCatalog.namespaceExists(getNamespace(dbName));
     }
 
     public List<String> listDatabaseNames() {
         try {
-            return preExecutionAuthenticator.execute(() -> 
nsCatalog.listNamespaces().stream()
-                   .map(Namespace::toString)
+            return preExecutionAuthenticator.execute(() -> 
nsCatalog.listNamespaces(getNamespace())
+                   .stream()
+                   .map(n -> n.level(n.length() - 1))
                    .collect(Collectors.toList()));
         } catch (Exception e) {
             throw new RuntimeException("Failed to list database names, error 
message is: " + e.getMessage());
@@ -97,7 +109,7 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public List<String> listTableNames(String dbName) {
-        List<TableIdentifier> tableIdentifiers = 
catalog.listTables(Namespace.of(dbName));
+        List<TableIdentifier> tableIdentifiers = 
catalog.listTables(getNamespace(dbName));
         return 
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
     }
 
@@ -127,12 +139,14 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
                 ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, 
dbName);
             }
         }
-        String icebergCatalogType = dorisCatalog.getIcebergCatalogType();
-        if (!properties.isEmpty() && 
!IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
-            throw new DdlException(
+        if (!properties.isEmpty() && dorisCatalog instanceof 
IcebergExternalCatalog) {
+            String icebergCatalogType = ((IcebergExternalCatalog) 
dorisCatalog).getIcebergCatalogType();
+            if 
(!IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
+                throw new DdlException(
                     "Not supported: create database with properties for 
iceberg catalog type: " + icebergCatalogType);
+            }
         }
-        nsCatalog.createNamespace(Namespace.of(dbName), properties);
+        nsCatalog.createNamespace(getNamespace(dbName), properties);
         dorisCatalog.onRefreshCache(true);
     }
 
@@ -159,7 +173,7 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
             }
         }
         SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
-        nsCatalog.dropNamespace(Namespace.of(dbName));
+        nsCatalog.dropNamespace(getNamespace(dbName));
         dorisCatalog.onRefreshCache(true);
     }
 
@@ -199,7 +213,7 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         Map<String, String> properties = stmt.getProperties();
         properties.put(ExternalCatalog.DORIS_VERSION, 
ExternalCatalog.DORIS_VERSION_VALUE);
         PartitionSpec partitionSpec = 
IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
-        catalog.createTable(TableIdentifier.of(dbName, tableName), schema, 
partitionSpec, properties);
+        catalog.createTable(getTableIdentifier(dbName, tableName), schema, 
partitionSpec, properties);
         db.setUnInitialized(true);
         return false;
     }
@@ -237,7 +251,7 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
                 ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, 
tableName, dbName);
             }
         }
-        catalog.dropTable(TableIdentifier.of(dbName, tableName), true);
+        catalog.dropTable(getTableIdentifier(dbName, tableName), true);
         db.setUnInitialized(true);
     }
 
@@ -249,4 +263,25 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
     public PreExecutionAuthenticator getPreExecutionAuthenticator() {
         return preExecutionAuthenticator;
     }
+
+    @Override
+    public Table loadTable(String dbName, String tblName) {
+        return catalog.loadTable(getTableIdentifier(dbName, tblName));
+    }
+
+    private TableIdentifier getTableIdentifier(String dbName, String tblName) {
+        return externalCatalogName
+            .map(s -> TableIdentifier.of(s, dbName, tblName))
+            .orElseGet(() -> TableIdentifier.of(dbName, tblName));
+    }
+
+    private Namespace getNamespace(String dbName) {
+        return externalCatalogName
+            .map(s -> Namespace.of(s, dbName))
+            .orElseGet(() -> Namespace.of(dbName));
+    }
+
+    private Namespace getNamespace() {
+        return externalCatalogName.map(Namespace::of).orElseGet(() -> 
Namespace.empty());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
index 908a4fa9e3f..b92d2c91f96 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.aws.AwsClientProperties;
-import org.apache.iceberg.aws.s3.S3FileIO;
 import org.apache.iceberg.aws.s3.S3FileIOProperties;
 
 import java.util.HashMap;
@@ -71,7 +70,6 @@ public class IcebergRestExternalCatalog extends 
IcebergExternalCatalog {
 
         Map<String, String> props = catalogProperty.getProperties();
         Map<String, String> restProperties = new HashMap<>(props);
-        restProperties.put(CatalogProperties.FILE_IO_IMPL, 
S3FileIO.class.getName());
         restProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
         String restUri = props.getOrDefault(CatalogProperties.URI, "");
         restProperties.put(CatalogProperties.URI, restUri);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 685915025d6..d0cca11b0af 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -22,6 +22,7 @@ package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
@@ -140,7 +141,7 @@ public class IcebergTransaction implements Transaction {
 
     private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
         Objects.requireNonNull(tableInfo);
-        IcebergExternalCatalog externalCatalog = ops.getExternalCatalog();
+        ExternalCatalog externalCatalog = ops.getExternalCatalog();
         return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
index 4a2757f918f..50166fe8305 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
@@ -17,9 +17,9 @@
 
 package org.apache.doris.datasource.operations;
 
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 
@@ -34,7 +34,7 @@ public class ExternalMetadataOperations {
         return new HiveMetadataOps(hiveConf, jdbcClientConfig, catalog);
     }
 
-    public static IcebergMetadataOps 
newIcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) {
+    public static IcebergMetadataOps newIcebergMetadataOps(ExternalCatalog 
dorisCatalog, Catalog catalog) {
         return new IcebergMetadataOps(dorisCatalog, catalog);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
index 0333124b352..e5ed129c679 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
@@ -91,6 +91,10 @@ public interface ExternalMetadataOps {
 
     boolean databaseExist(String dbName);
 
+    default Object loadTable(String dbName, String tblName) {
+        throw new UnsupportedOperationException("Load table is not 
supported.");
+    }
+
     /**
      * close the connection, eg, to hms
      */
diff --git 
a/regression-test/data/external_table_p0/iceberg/iceberg_read_unitycatalog_table.out
 
b/regression-test/data/external_table_p0/iceberg/iceberg_read_unitycatalog_table.out
new file mode 100644
index 00000000000..42414c36549
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/iceberg_read_unitycatalog_table.out
@@ -0,0 +1,40 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q1 --
+1      nWYHawtqUw      930
+2      uvOzzthsLV      166
+3      WIAehuXWkv      170
+4      wYCSvnJKTo      709
+5      VsslXsUIDZ      993
+6      ZLsACYYTFy      813
+7      BtDDvLeBpK      52
+8      YISVtrPfGr      8
+9      PBPJHDFjjC      45
+10     qbDuUJzJMO      756
+11     EjqqWoaLJn      712
+12     jpZLMdKXpn      847
+13     acpjQXpJCp      649
+14     nOKqHhRwao      133
+15     kxUUZEUoKv      398
+
+-- !q2 --
+7
+8
+9
+10
+11
+12
+13
+14
+15
+
+-- !q3 --
+nWYHawtqUw     930
+wYCSvnJKTo     709
+VsslXsUIDZ     993
+ZLsACYYTFy     813
+qbDuUJzJMO     756
+EjqqWoaLJn     712
+jpZLMdKXpn     847
+acpjQXpJCp     649
+kxUUZEUoKv     398
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/iceberg_read_unitycatalog_table.groovy
 
b/regression-test/suites/external_table_p0/iceberg/iceberg_read_unitycatalog_table.groovy
new file mode 100644
index 00000000000..48b8b6559ca
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/iceberg_read_unitycatalog_table.groovy
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_read_unitycatalog_table", 
"p0,external,doris,external_docker,external_docker_doris") {
+
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String catalog_name = "iceberg_read_unitycatalog_table"
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    logger.info("catalog " + catalog_name + " created")
+    sql """ use ${catalog_name}.test_db """
+    String tb = "unitycatalog_marksheet_uniform"
+
+    qt_q1  """ select * from ${tb} order by c1 """ 
+    qt_q2  """ select c1 from ${tb} where c1 > 6 order by c1 """ 
+    qt_q3  """ select c2, c3 from ${tb} where c3 > 200 order by c1 """ 
+
+}
+
+/*
+
+spark-sql:
+    1. create table marksheet_uniform (c1 int, c2 string, c3 int);
+    2. get parquet file from marksheet_uniform; (ref: 
https://docs.unitycatalog.io/usage/tables/uniform/)
+    3. put parquet file to hdfs: hdfs dfs -put <parquet_file> hdfs://xxxxx
+    4. CALL <catalog_name>.system.add_files(
+            table => '<catalog_name>.unitycatalog_db.marksheet_uniform',
+            source_table => 
'`parquet`.`hdfs://172.20.32.136:8020/user/doris/preinstalled_data/iceberg_hadoop_warehouse/unitycatalog_db/marksheet_uniform_data/part-00000-5af50cc4-3218-465b-a3a4-eb4fc709421d-c000.snappy.parquet`'
+        );
+*/
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to