This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b32f8c6b003 [fix](hive/iceberg) rectify the fs name if path already 
contains fs (#49998)
b32f8c6b003 is described below

commit b32f8c6b00304695386e8ee50aae7d8da018fcf9
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Sat Apr 19 16:42:45 2025 -0700

    [fix](hive/iceberg) rectify the fs name if path already contains fs (#49998)
    
    ### What problem does this PR solve?
    
    In this PR #34520, we only handle HivePartitionWriter. But this should
    be applied to all hdfs writer.
    This PR fix it, unify the logic to make it work with both hive and
    iceberg writer.
    
    If the path is an absolute full path like `hdfs://host/path/to/file`,
    use `hdfs://host/` as fs name,
    otherwise, use default fs name.
---
 be/src/io/file_factory.cpp                         |  32 +++++++++++++++------
 be/src/io/file_factory.h                           |   3 ++
 be/src/vec/sink/writer/vhive_partition_writer.cpp  |  13 ---------
 .../iceberg/write/test_iceberg_write_insert.out    | Bin 142167 -> 162889 bytes
 .../iceberg/write/test_iceberg_write_insert.groovy |  23 ++++++++++++++-
 5 files changed, 49 insertions(+), 22 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 27dccd630f2..4932236c7c6 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -89,20 +89,36 @@ Result<io::FileSystemSPtr> FileFactory::create_fs(const 
io::FSPropertiesRef& fs_
                 *fs_properties.properties, s3_uri, &s3_conf));
         return io::S3FileSystem::create(std::move(s3_conf), 
io::FileSystem::TMP_FS_ID);
     }
-    case TFileType::FILE_HDFS:
-        return fs_properties.hdfs_params
-                       ? io::HdfsFileSystem::create(*fs_properties.hdfs_params,
-                                                    file_description.fs_name,
-                                                    io::FileSystem::TMP_FS_ID, 
nullptr)
-                       : io::HdfsFileSystem::create(*fs_properties.properties,
-                                                    file_description.fs_name,
-                                                    io::FileSystem::TMP_FS_ID, 
nullptr);
+    case TFileType::FILE_HDFS: {
+        std::string fs_name = _get_fs_name(file_description);
+        return io::HdfsFileSystem::create(*fs_properties.properties, fs_name,
+                                          io::FileSystem::TMP_FS_ID, nullptr);
+    }
     default:
         return ResultError(Status::InternalError("unsupported fs type: {}",
                                                  
std::to_string(fs_properties.type)));
     }
 }
 
+std::string FileFactory::_get_fs_name(const io::FileDescription& 
file_description) {
+    // If the destination path contains a schema, use the schema directly.
+    // If not, use origin file_description.fs_name
+    // Because the default fsname in file_description.fs_name maybe different 
from
+    // file's.
+    // example:
+    //    hdfs://host:port/path1/path2  --> hdfs://host:port
+    //    hdfs://nameservice/path1/path2 --> hdfs://nameservice
+    std::string fs_name = file_description.fs_name;
+    string::size_type idx = file_description.path.find("://");
+    if (idx != string::npos) {
+        idx = file_description.path.find("/", idx + 3);
+        if (idx != string::npos) {
+            fs_name = file_description.path.substr(0, idx);
+        }
+    }
+    return fs_name;
+}
+
 Result<io::FileWriterPtr> FileFactory::create_file_writer(
         TFileType::type type, ExecEnv* env, const 
std::vector<TNetworkAddress>& broker_addresses,
         const std::map<std::string, std::string>& properties, const 
std::string& path,
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index afa54e22166..00c7122bfda 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -122,6 +122,9 @@ public:
         }
         throw Exception(Status::FatalError("__builtin_unreachable"));
     }
+
+private:
+    static std::string _get_fs_name(const io::FileDescription& 
file_description);
 };
 
 } // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index 9f8f1041a01..8a5e2a9777e 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -61,19 +61,6 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profile)
     io::FileDescription file_description = {
             .path = fmt::format("{}/{}", _write_info.write_path, 
_get_target_file_name()),
             .fs_name {}};
-    // If the destination path contains a schema, use the schema directly.
-    // If not, use defaultFS.
-    // Otherwise a write error will occur.
-    // example:
-    //    hdfs://host:port/path1/path2  --> hdfs://host:port
-    //    hdfs://nameservice/path1/path2 --> hdfs://nameservice
-    string::size_type idx = file_description.path.find("://");
-    if (idx != string::npos) {
-        idx = file_description.path.find("/", idx + 3);
-        if (idx != string::npos) {
-            file_description.fs_name = file_description.path.substr(0, idx);
-        }
-    }
     _fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
     io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
     RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, 
&file_writer_options));
diff --git 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
index d90434aa9c1..006596a760f 100644
Binary files 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
 and 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
 differ
diff --git 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
index 900006bc216..617d830e16d 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
@@ -818,6 +818,7 @@ suite("test_iceberg_write_insert", 
"p0,external,iceberg,external_docker,external
                 'iceberg.catalog.type'='hms',
                 'hive.metastore.uris' = 
'thrift://${externalEnvIp}:${hms_port}',
                 'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}',
+                'warehouse' = 'hdfs://${externalEnvIp}:${hdfs_port}',
                 'use_meta_cache' = 'true'
             );"""
             sql """drop catalog if exists ${hive_catalog_name}"""
@@ -828,7 +829,9 @@ suite("test_iceberg_write_insert", 
"p0,external,iceberg,external_docker,external
                 'use_meta_cache' = 'true'
             );"""
 
-            sql """use `${iceberg_catalog_name}`.`write_test`"""
+            sql """drop database if exists 
`${iceberg_catalog_name}`.`iceberg_write_test` force"""
+            sql """create database 
`${iceberg_catalog_name}`.`iceberg_write_test`"""
+            sql """use `${iceberg_catalog_name}`.`iceberg_write_test`"""
 
             sql """set enable_fallback_to_original_planner=false;"""
 
@@ -842,6 +845,24 @@ suite("test_iceberg_write_insert", 
"p0,external,iceberg,external_docker,external
 
             sql """drop catalog if exists ${iceberg_catalog_name}"""
             sql """drop catalog if exists ${hive_catalog_name}"""
+
+            // test with wrong fs.defaultFS
+            sql """create catalog if not exists ${iceberg_catalog_name} 
properties (
+                'type'='iceberg',
+                'iceberg.catalog.type'='hms',
+                'hive.metastore.uris' = 
'thrift://${externalEnvIp}:${hms_port}',
+                'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}/tmp',
+                'warehouse' = 'hdfs://${externalEnvIp}:${hdfs_port}/tmp',
+                'use_meta_cache' = 'true'
+            );"""
+
+            sql """drop database if exists 
`${iceberg_catalog_name}`.`wrong_fs_name` force"""
+            sql """create database `${iceberg_catalog_name}`.`wrong_fs_name`"""
+            sql """use `${iceberg_catalog_name}`.`wrong_fs_name`"""
+
+            q01("parquet_zstd", iceberg_catalog_name)
+
+            sql """drop catalog if exists ${iceberg_catalog_name}"""
         } finally {
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to