This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 6be6fcc7e78 [BugFix](MultiCatalog) Fix oss file location is not avaiable in iceberg hadoop catalog (#30761) (#30992) 6be6fcc7e78 is described below commit 6be6fcc7e7870e6013245f3f30350fee43a25819 Author: Kang <kxiao.ti...@gmail.com> AuthorDate: Thu Feb 8 16:43:07 2024 +0800 [BugFix](MultiCatalog) Fix oss file location is not avaiable in iceberg hadoop catalog (#30761) (#30992) --- .../org/apache/doris/common/util/LocationPath.java | 191 ++++++++++++--------- .../planner/external/iceberg/IcebergScanNode.java | 9 +- .../apache/doris/common/util/LocationPathTest.java | 14 ++ 3 files changed, 127 insertions(+), 87 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index 0ddba406cdc..fd7da29e519 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -39,6 +39,8 @@ import java.net.URISyntaxException; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.nio.file.InvalidPathException; +import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -49,7 +51,7 @@ public class LocationPath { private final LocationType locationType; private final String location; - enum LocationType { + public enum LocationType { HDFS, LOCAL, // Local File BOS, // Baidu @@ -66,7 +68,8 @@ public class LocationPath { S3A, S3N, VIEWFS, - UNKNOWN + UNKNOWN, + NOSCHEME // no scheme info } private LocationPath(String location) { @@ -75,107 +78,123 @@ public class LocationPath { public LocationPath(String location, Map<String, String> props) { String scheme = parseScheme(location).toLowerCase(); - switch (scheme) { - case FeConstants.FS_PREFIX_HDFS: - locationType = LocationType.HDFS; - // Need add hdfs host to location - String host = props.get(HdfsResource.DSF_NAMESERVICES); - this.location = normalizedHdfsPath(location, host); - break; - case FeConstants.FS_PREFIX_S3: - locationType = LocationType.S3; - this.location = location; - break; - case FeConstants.FS_PREFIX_S3A: - locationType = LocationType.S3A; - this.location = convertToS3(location); - break; - case FeConstants.FS_PREFIX_S3N: - // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. - locationType = LocationType.S3N; - this.location = convertToS3(location); - break; - case FeConstants.FS_PREFIX_BOS: - locationType = LocationType.BOS; - // use s3 client to access - this.location = convertToS3(location); - break; - case FeConstants.FS_PREFIX_GCS: - locationType = LocationType.GCS; - // use s3 client to access - this.location = convertToS3(location); - break; - case FeConstants.FS_PREFIX_OSS: - if (isHdfsOnOssEndpoint(location)) { - locationType = LocationType.OSS_HDFS; + if (scheme.isEmpty()) { + locationType = LocationType.NOSCHEME; + this.location = location; + } else { + switch (scheme) { + case FeConstants.FS_PREFIX_HDFS: + locationType = LocationType.HDFS; + // Need add hdfs host to location + String host = props.get(HdfsResource.DSF_NAMESERVICES); + this.location = normalizedHdfsPath(location, host); + break; + case FeConstants.FS_PREFIX_S3: + locationType = LocationType.S3; this.location = location; - } else { + break; + case FeConstants.FS_PREFIX_S3A: + locationType = LocationType.S3A; + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_S3N: + // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. + locationType = LocationType.S3N; + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_BOS: + locationType = LocationType.BOS; + // use s3 client to access + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_GCS: + locationType = LocationType.GCS; + // use s3 client to access + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_OSS: + if (isHdfsOnOssEndpoint(location)) { + locationType = LocationType.OSS_HDFS; + this.location = location; + } else { + if (useS3EndPoint(props)) { + this.location = convertToS3(location); + } else { + this.location = location; + } + locationType = LocationType.OSS; + } + break; + case FeConstants.FS_PREFIX_COS: if (useS3EndPoint(props)) { this.location = convertToS3(location); } else { this.location = location; } - locationType = LocationType.OSS; - } - break; - case FeConstants.FS_PREFIX_COS: - if (useS3EndPoint(props)) { - this.location = convertToS3(location); - } else { + locationType = LocationType.COS; + break; + case FeConstants.FS_PREFIX_OBS: + if (useS3EndPoint(props)) { + this.location = convertToS3(location); + } else { + this.location = location; + } + locationType = LocationType.OBS; + break; + case FeConstants.FS_PREFIX_OFS: + locationType = LocationType.OFS; this.location = location; - } - locationType = LocationType.COS; - break; - case FeConstants.FS_PREFIX_OBS: - if (useS3EndPoint(props)) { - this.location = convertToS3(location); - } else { + break; + case FeConstants.FS_PREFIX_JFS: + locationType = LocationType.JFS; this.location = location; - } - locationType = LocationType.OBS; - break; - case FeConstants.FS_PREFIX_OFS: - locationType = LocationType.OFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_JFS: - locationType = LocationType.JFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_GFS: - locationType = LocationType.GFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_COSN: - // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues - locationType = LocationType.COSN; - this.location = location; - break; - case FeConstants.FS_PREFIX_VIEWFS: - locationType = LocationType.VIEWFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_FILE: - locationType = LocationType.LOCAL; - this.location = location; - break; - default: - locationType = LocationType.UNKNOWN; - this.location = location; + break; + case FeConstants.FS_PREFIX_GFS: + locationType = LocationType.GFS; + this.location = location; + break; + case FeConstants.FS_PREFIX_COSN: + // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues + locationType = LocationType.COSN; + this.location = location; + break; + case FeConstants.FS_PREFIX_VIEWFS: + locationType = LocationType.VIEWFS; + this.location = location; + break; + case FeConstants.FS_PREFIX_FILE: + locationType = LocationType.LOCAL; + this.location = location; + break; + default: + locationType = LocationType.UNKNOWN; + this.location = location; + } } } private static String parseScheme(String location) { + String scheme = ""; String[] schemeSplit = location.split(SCHEME_DELIM); if (schemeSplit.length > 1) { - return schemeSplit[0]; + scheme = schemeSplit[0]; } else { schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM); if (schemeSplit.length > 1) { - return schemeSplit[0]; + scheme = schemeSplit[0]; } - throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location); } + + // if not get scheme, need consider /path/to/local to no scheme + if (scheme.isEmpty()) { + try { + Paths.get(location); + } catch (InvalidPathException exception) { + throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location); + } + } + + return scheme; } private boolean useS3EndPoint(Map<String, String> props) { @@ -196,6 +215,7 @@ public class LocationPath { /** * The converted path is used for FE to get metadata + * * @param location origin location * @return metadata location path. just convert when storage is compatible with s3 client. */ @@ -219,7 +239,7 @@ public class LocationPath { // Need to encode these characters before creating URI. // But doesn't encode '/' and ':' so that we can get the correct uri host. location = URLEncoder.encode(location, StandardCharsets.UTF_8.name()) - .replace("%2F", "/").replace("%3A", ":"); + .replace("%2F", "/").replace("%3A", ":"); URI normalizedUri = new URI(location); // compatible with 'hdfs:///' or 'hdfs:/' if (StringUtils.isEmpty(normalizedUri.getHost())) { @@ -336,6 +356,7 @@ public class LocationPath { /** * The converted path is used for BE + * * @return BE scan range path */ public Path toScanRangeLocation() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index d88d18d91af..dbd1aab2392 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -353,11 +353,16 @@ public class IcebergScanNode extends FileQueryScanNode { private String normalizeLocation(String location) { Map<String, String> props = source.getCatalog().getProperties(); + LocationPath locationPath = new LocationPath(location, props); String icebergCatalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE); if ("hadoop".equalsIgnoreCase(icebergCatalogType)) { - if (!location.startsWith(HdfsResource.HDFS_PREFIX)) { + // if no scheme info, fill will HADOOP_FS_NAME + // if no HADOOP_FS_NAME, then should be local file system + if (locationPath.getLocationType() == LocationPath.LocationType.NOSCHEME) { String fsName = props.get(HdfsResource.HADOOP_FS_NAME); - location = fsName + location; + if (fsName != null) { + location = fsName + location; + } } } return location; diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java index 71ee9100ffc..571826aa9c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java @@ -171,8 +171,22 @@ public class LocationPathTest { LocationPath locationPath = new LocationPath("unknown://test.com", rangeProps); // FE Assertions.assertTrue(locationPath.get().startsWith("unknown://")); + Assertions.assertTrue(locationPath.getLocationType() == LocationPath.LocationType.UNKNOWN); // BE String beLocation = locationPath.toScanRangeLocation().toString(); Assertions.assertTrue(beLocation.startsWith("unknown://")); } + + @Test + public void testNoSchemeLocation() { + // when use unknown location, pass to BE + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("/path/to/local", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().equalsIgnoreCase("/path/to/local")); + Assertions.assertTrue(locationPath.getLocationType() == LocationPath.LocationType.NOSCHEME); + // BE + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.equalsIgnoreCase("/path/to/local")); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org