This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b32f8c6b003 [fix](hive/iceberg) rectify the fs name if path already contains fs (#49998) b32f8c6b003 is described below commit b32f8c6b00304695386e8ee50aae7d8da018fcf9 Author: Mingyu Chen (Rayner) <morning...@163.com> AuthorDate: Sat Apr 19 16:42:45 2025 -0700 [fix](hive/iceberg) rectify the fs name if path already contains fs (#49998) ### What problem does this PR solve? In this PR #34520, we only handle HivePartitionWriter. But this should be applied to all hdfs writer. This PR fix it, unify the logic to make it work with both hive and iceberg writer. If the path is an absolute full path like `hdfs://host/path/to/file`, use `hdfs://host/` as fs name, otherwise, use default fs name. --- be/src/io/file_factory.cpp | 32 +++++++++++++++------ be/src/io/file_factory.h | 3 ++ be/src/vec/sink/writer/vhive_partition_writer.cpp | 13 --------- .../iceberg/write/test_iceberg_write_insert.out | Bin 142167 -> 162889 bytes .../iceberg/write/test_iceberg_write_insert.groovy | 23 ++++++++++++++- 5 files changed, 49 insertions(+), 22 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 27dccd630f2..4932236c7c6 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -89,20 +89,36 @@ Result<io::FileSystemSPtr> FileFactory::create_fs(const io::FSPropertiesRef& fs_ *fs_properties.properties, s3_uri, &s3_conf)); return io::S3FileSystem::create(std::move(s3_conf), io::FileSystem::TMP_FS_ID); } - case TFileType::FILE_HDFS: - return fs_properties.hdfs_params - ? io::HdfsFileSystem::create(*fs_properties.hdfs_params, - file_description.fs_name, - io::FileSystem::TMP_FS_ID, nullptr) - : io::HdfsFileSystem::create(*fs_properties.properties, - file_description.fs_name, - io::FileSystem::TMP_FS_ID, nullptr); + case TFileType::FILE_HDFS: { + std::string fs_name = _get_fs_name(file_description); + return io::HdfsFileSystem::create(*fs_properties.properties, fs_name, + io::FileSystem::TMP_FS_ID, nullptr); + } default: return ResultError(Status::InternalError("unsupported fs type: {}", std::to_string(fs_properties.type))); } } +std::string FileFactory::_get_fs_name(const io::FileDescription& file_description) { + // If the destination path contains a schema, use the schema directly. + // If not, use origin file_description.fs_name + // Because the default fsname in file_description.fs_name maybe different from + // file's. + // example: + // hdfs://host:port/path1/path2 --> hdfs://host:port + // hdfs://nameservice/path1/path2 --> hdfs://nameservice + std::string fs_name = file_description.fs_name; + string::size_type idx = file_description.path.find("://"); + if (idx != string::npos) { + idx = file_description.path.find("/", idx + 3); + if (idx != string::npos) { + fs_name = file_description.path.substr(0, idx); + } + } + return fs_name; +} + Result<io::FileWriterPtr> FileFactory::create_file_writer( TFileType::type type, ExecEnv* env, const std::vector<TNetworkAddress>& broker_addresses, const std::map<std::string, std::string>& properties, const std::string& path, diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index afa54e22166..00c7122bfda 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -122,6 +122,9 @@ public: } throw Exception(Status::FatalError("__builtin_unreachable")); } + +private: + static std::string _get_fs_name(const io::FileDescription& file_description); }; } // namespace doris diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index 9f8f1041a01..8a5e2a9777e 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -61,19 +61,6 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) io::FileDescription file_description = { .path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()), .fs_name {}}; - // If the destination path contains a schema, use the schema directly. - // If not, use defaultFS. - // Otherwise a write error will occur. - // example: - // hdfs://host:port/path1/path2 --> hdfs://host:port - // hdfs://nameservice/path1/path2 --> hdfs://nameservice - string::size_type idx = file_description.path.find("://"); - if (idx != string::npos) { - idx = file_description.path.find("/", idx + 3); - if (idx != string::npos) { - file_description.fs_name = file_description.path.substr(0, idx); - } - } _fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description)); io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true}; RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options)); diff --git a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out index d90434aa9c1..006596a760f 100644 Binary files a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out and b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out differ diff --git a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy index 900006bc216..617d830e16d 100644 --- a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy +++ b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy @@ -818,6 +818,7 @@ suite("test_iceberg_write_insert", "p0,external,iceberg,external_docker,external 'iceberg.catalog.type'='hms', 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', 'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}', + 'warehouse' = 'hdfs://${externalEnvIp}:${hdfs_port}', 'use_meta_cache' = 'true' );""" sql """drop catalog if exists ${hive_catalog_name}""" @@ -828,7 +829,9 @@ suite("test_iceberg_write_insert", "p0,external,iceberg,external_docker,external 'use_meta_cache' = 'true' );""" - sql """use `${iceberg_catalog_name}`.`write_test`""" + sql """drop database if exists `${iceberg_catalog_name}`.`iceberg_write_test` force""" + sql """create database `${iceberg_catalog_name}`.`iceberg_write_test`""" + sql """use `${iceberg_catalog_name}`.`iceberg_write_test`""" sql """set enable_fallback_to_original_planner=false;""" @@ -842,6 +845,24 @@ suite("test_iceberg_write_insert", "p0,external,iceberg,external_docker,external sql """drop catalog if exists ${iceberg_catalog_name}""" sql """drop catalog if exists ${hive_catalog_name}""" + + // test with wrong fs.defaultFS + sql """create catalog if not exists ${iceberg_catalog_name} properties ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + 'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}/tmp', + 'warehouse' = 'hdfs://${externalEnvIp}:${hdfs_port}/tmp', + 'use_meta_cache' = 'true' + );""" + + sql """drop database if exists `${iceberg_catalog_name}`.`wrong_fs_name` force""" + sql """create database `${iceberg_catalog_name}`.`wrong_fs_name`""" + sql """use `${iceberg_catalog_name}`.`wrong_fs_name`""" + + q01("parquet_zstd", iceberg_catalog_name) + + sql """drop catalog if exists ${iceberg_catalog_name}""" } finally { } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org