This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-c108335-hive-sql in repository https://gitbox.apache.org/repos/asf/doris.git
commit 278d5bfa7582fd838e54b98312d8ccf47c25113a Author: morningman <yun...@selectdb.com> AuthorDate: Wed Mar 19 15:50:20 2025 +0800 [opt](oss) support oss root policy --- be/src/common/config.cpp | 5 ++ be/src/common/config.h | 2 + be/src/io/fs/hdfs_file_reader.cpp | 58 +++++++++-------- .../org/apache/doris/common/util/LocationPath.java | 29 ++++++--- .../apache/doris/datasource/ExternalCatalog.java | 3 + .../apache/doris/common/util/LocationPathTest.java | 74 ++++++++++++++++++++++ 6 files changed, 134 insertions(+), 37 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b349c7430b8..1d0bcacafbd 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1457,6 +1457,11 @@ DEFINE_mBool(enable_prune_delete_sign_when_base_compaction, "true"); DEFINE_mBool(enable_mow_verbose_log, "false"); +// Some when using jindofs to access oss-hdfs, +// some method may not compatible with open source hdfs filesystem. +// Set this to true to NOT calling these methods. +DEFINE_mBool(enable_oss_jindofs, "false"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 1c6948d4ee8..345c706e361 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1532,6 +1532,8 @@ DECLARE_mBool(enable_prune_delete_sign_when_base_compaction); DECLARE_mBool(enable_mow_verbose_log); +DECLARE_mBool(enable_oss_jindofs); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index d43cfae1c28..591d461d793 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -232,37 +232,39 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r void HdfsFileReader::_collect_profile_before_close() { if (_profile != nullptr && is_hdfs(_fs_name)) { #ifdef USE_HADOOP_HDFS - struct hdfsReadStatistics* hdfs_statistics = nullptr; - auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics); - if (r != 0) { - LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r - << ", name node: " << _fs_name; - return; - } - COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead); - COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, hdfs_statistics->totalLocalBytesRead); - COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read, - hdfs_statistics->totalShortCircuitBytesRead); - COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read, - hdfs_statistics->totalZeroCopyBytesRead); - hdfsFileFreeReadStatistics(hdfs_statistics); + if (!config::enable_oss_jindofs) { + struct hdfsReadStatistics* hdfs_statistics = nullptr; + auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics); + if (r != 0) { + LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r + << ", name node: " << _fs_name; + return; + } + COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, hdfs_statistics->totalLocalBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read, + hdfs_statistics->totalShortCircuitBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read, + hdfs_statistics->totalZeroCopyBytesRead); + hdfsFileFreeReadStatistics(hdfs_statistics); - struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr; - r = hdfsGetHedgedReadMetrics(_handle->fs(), &hdfs_hedged_read_statistics); - if (r != 0) { - LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r - << ", name node: " << _fs_name; - return; - } + struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr; + r = hdfsGetHedgedReadMetrics(_handle->fs(), &hdfs_hedged_read_statistics); + if (r != 0) { + LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r + << ", name node: " << _fs_name; + return; + } - COUNTER_UPDATE(_hdfs_profile.total_hedged_read, hdfs_hedged_read_statistics->hedgedReadOps); - COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread, - hdfs_hedged_read_statistics->hedgedReadOpsInCurThread); - COUNTER_UPDATE(_hdfs_profile.hedged_read_wins, - hdfs_hedged_read_statistics->hedgedReadOpsWin); + COUNTER_UPDATE(_hdfs_profile.total_hedged_read, hdfs_hedged_read_statistics->hedgedReadOps); + COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread, + hdfs_hedged_read_statistics->hedgedReadOpsInCurThread); + COUNTER_UPDATE(_hdfs_profile.hedged_read_wins, + hdfs_hedged_read_statistics->hedgedReadOpsWin); - hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics); - hdfsFileClearReadStatistics(_handle->file()); + hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics); + hdfsFileClearReadStatistics(_handle->file()); + } #endif } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index 4ca8f9605a0..cd57fc645cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -20,6 +20,7 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.property.constants.CosProperties; import org.apache.doris.datasource.property.constants.ObsProperties; @@ -52,6 +53,8 @@ public class LocationPath { private static final Logger LOG = LogManager.getLogger(LocationPath.class); private static final String SCHEME_DELIM = "://"; private static final String NONSTANDARD_SCHEME_DELIM = ":/"; + private static String STANDARD_HDFS_PREFIX = "hdfs://"; + private static String BROKEN_HDFS_PREFIX = "hdfs:/"; private final Scheme scheme; private final String location; private final boolean isBindBroker; @@ -106,7 +109,9 @@ public class LocationPath { this.scheme = Scheme.HDFS; // Need add hdfs host to location String host = props.get(HdfsResource.DSF_NAMESERVICES); - tmpLocation = convertPath ? normalizedHdfsPath(tmpLocation, host) : tmpLocation; + boolean enableOssRootPolicy = props.getOrDefault(ExternalCatalog.OOS_ROOT_POLICY, "false") + .equals("true"); + tmpLocation = convertPath ? normalizedHdfsPath(tmpLocation, host, enableOssRootPolicy) : tmpLocation; break; case FeConstants.FS_PREFIX_S3: this.scheme = Scheme.S3; @@ -364,7 +369,8 @@ public class LocationPath { return pos; } - private static String normalizedHdfsPath(String location, String host) { + @VisibleForTesting + public static String normalizedHdfsPath(String location, String host, boolean enableOssRootPolicy) { try { // Hive partition may contain special characters such as ' ', '<', '>' and so on. // Need to encode these characters before creating URI. @@ -375,23 +381,28 @@ public class LocationPath { // compatible with 'hdfs:///' or 'hdfs:/' if (StringUtils.isEmpty(normalizedUri.getHost())) { location = URLDecoder.decode(location, StandardCharsets.UTF_8.name()); - String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//"; - String brokenPrefix = HdfsResource.HDFS_PREFIX + "/"; - if (location.startsWith(brokenPrefix) && !location.startsWith(normalizedPrefix)) { - location = location.replace(brokenPrefix, normalizedPrefix); + if (location.startsWith(BROKEN_HDFS_PREFIX) && !location.startsWith(STANDARD_HDFS_PREFIX)) { + location = location.replace(BROKEN_HDFS_PREFIX, STANDARD_HDFS_PREFIX); } if (StringUtils.isNotEmpty(host)) { // Replace 'hdfs://key/' to 'hdfs://name_service/key/' // Or hdfs:///abc to hdfs://name_service/abc - return location.replace(normalizedPrefix, normalizedPrefix + host + "/"); + return location.replace(STANDARD_HDFS_PREFIX, STANDARD_HDFS_PREFIX + host + "/"); } else { // 'hdfs://null/' equals the 'hdfs:///' if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) { // Do not support hdfs:///location throw new RuntimeException("Invalid location with empty host: " + location); } else { - // Replace 'hdfs://key/' to '/key/', try access local NameNode on BE. - return location.replace(normalizedPrefix, "/"); + if (enableOssRootPolicy) { + // if oss root policy is enabled, the path should be like: + // hdfs://customized_host/path/to/file + // Should remain unchanged. + return location; + } else { + // Replace 'hdfs://key/' to '/key/', try access local NameNode on BE. + return location.replace(STANDARD_HDFS_PREFIX, "/"); + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 071accf0e77..6164b9394ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -120,6 +120,9 @@ public abstract class ExternalCatalog public static final String FOUND_CONFLICTING = "Found conflicting"; public static final String ONLY_TEST_LOWER_CASE_TABLE_NAMES = "only_test_lower_case_table_names"; + // https://help.aliyun.com/zh/emr/emr-on-ecs/user-guide/use-rootpolicy-to-access-oss-hdfs?spm=a2c4g.11186623.help-menu-search-28066.d_0 + public static final String OOS_ROOT_POLICY = "oss.root_policy"; + // Properties that should not be shown in the `show create catalog` result public static final Set<String> HIDDEN_PROPERTIES = Sets.newHashSet( CREATE_TIME, diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java index 4457b7dd1ef..93a46a2dfa0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java @@ -217,4 +217,78 @@ public class LocationPathTest { LocationPath p3 = new LocationPath("file://authority/abc/def", props); Assertions.assertEquals(Scheme.LOCAL, p3.getScheme()); } + + @Test + public void testNormalizedHdfsPath() { + // Test case 1: Path with special characters that need encoding + // Input: Path with spaces and special characters + // Expected: Characters are properly encoded while preserving / and : + String location = "hdfs://namenode/path with spaces/<special>chars"; + String host = ""; + boolean enableOssRootPolicy = false; + String result = LocationPath.normalizedHdfsPath(location, host, enableOssRootPolicy); + Assertions.assertEquals("hdfs://namenode/path with spaces/<special>chars", result); + + // Test case 2: Empty host in URI with host parameter provided + // Input: hdfs:///, host = nameservice + // Expected: hdfs://nameservice/ + location = "hdfs:///path/to/file"; + host = "nameservice"; + result = LocationPath.normalizedHdfsPath(location, host, false); + Assertions.assertEquals("hdfs://nameservice//path/to/file", result); + + // Test case 3: Broken prefix case (hdfs:/ instead of hdfs://) + // Input: hdfs:/path, host = nameservice + // Expected: hdfs://nameservice/path + location = "hdfs:/path/to/file"; + host = "nameservice"; + result = LocationPath.normalizedHdfsPath(location, host, false); + Assertions.assertEquals("hdfs://nameservice/path/to/file", result); + + // Test case 4: Empty host parameter with enableOssRootPolicy=true + // Input: hdfs://customized_host/path + // Expected: hdfs://customized_host/path (unchanged) + location = "hdfs://customized_host/path/to/file"; + host = ""; + result = LocationPath.normalizedHdfsPath(location, host, true); + Assertions.assertEquals("hdfs://customized_host/path/to/file", result); + + // Test case 5: Empty host parameter with enableOssRootPolicy=false + // Input: hdfs://host/path + // Expected: /path + location = "hdfs://customized_host/path/to/file"; + host = ""; + result = LocationPath.normalizedHdfsPath(location, host, false); + Assertions.assertEquals("/customized_host/path/to/file", result); + + // Test case 6: hdfs:/// with empty host parameter + // Input: hdfs:///path + // Expected: Exception since this format is not supported + location = "hdfs:///path/to/file"; + host = ""; + boolean exceptionThrown = false; + try { + LocationPath.normalizedHdfsPath(location, host, false); + } catch (RuntimeException e) { + exceptionThrown = true; + Assertions.assertTrue(e.getMessage().contains("Invalid location with empty host")); + } + Assertions.assertTrue(exceptionThrown); + + // Test case 7: Non-empty host in URI (regular case) + // Input: hdfs://existinghost/path + // Expected: hdfs://existinghost/path (unchanged) + location = "hdfs://existinghost/path/to/file"; + host = "nameservice"; + result = LocationPath.normalizedHdfsPath(location, host, false); + Assertions.assertEquals("hdfs://existinghost/path/to/file", result); + + // Test case 8: No valid host name + // Input: hdfs://hdfs_host/path + // Expected: hdfs://existinghost/path (unchanged) + location = "hdfs://hdfs_host/path/to/file"; + host = "nameservice"; + result = LocationPath.normalizedHdfsPath(location, host, false); + Assertions.assertEquals("hdfs://nameservice/hdfs_host/path/to/file", result); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org