This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e431c2b9801 [Improvement](multi-catalog)make location easier to modified, decoupling all storage with single location class (#27874) e431c2b9801 is described below commit e431c2b980174b8a90f0500d59a559903fc36eb8 Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Wed Dec 6 00:13:54 2023 +0800 [Improvement](multi-catalog)make location easier to modified, decoupling all storage with single location class (#27874) decoupling all storage with single location class --- .../java/org/apache/doris/common/FeConstants.java | 30 +- .../org/apache/doris/common/util/LocationPath.java | 380 +++++++++++++++++++++ .../java/org/apache/doris/common/util/S3Util.java | 138 -------- .../doris/datasource/hive/HiveMetaStoreCache.java | 34 +- .../datasource/property/PropertyConverter.java | 6 +- .../org/apache/doris/fs/FileSystemFactory.java | 34 -- .../doris/planner/external/FileQueryScanNode.java | 32 -- .../doris/planner/external/HiveScanNode.java | 4 +- .../doris/planner/external/hudi/HudiScanNode.java | 11 +- .../planner/external/iceberg/IcebergScanNode.java | 12 +- .../apache/doris/common/util/LocationPathTest.java | 178 ++++++++++ .../org/apache/doris/common/util/S3UtilTest.java | 104 ------ 12 files changed, 609 insertions(+), 354 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 9d264b0c0ab..b9604009bed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -64,21 +64,21 @@ public class FeConstants { // use \N to indicate NULL public static String null_string = "\\N"; - public static String FS_PREFIX_S3 = "s3"; - public static String FS_PREFIX_S3A = "s3a"; - public static String FS_PREFIX_S3N = "s3n"; - public static String FS_PREFIX_OSS = "oss"; - public static String FS_PREFIX_GCS = "gs"; - public static String FS_PREFIX_BOS = "bos"; - public static String FS_PREFIX_COS = "cos"; - public static String FS_PREFIX_COSN = "cosn"; - public static String FS_PREFIX_OBS = "obs"; - public static String FS_PREFIX_OFS = "ofs"; - public static String FS_PREFIX_GFS = "gfs"; - public static String FS_PREFIX_JFS = "jfs"; - public static String FS_PREFIX_HDFS = "hdfs"; - public static String FS_PREFIX_VIEWFS = "viewfs"; - public static String FS_PREFIX_FILE = "file"; + public static final String FS_PREFIX_S3 = "s3"; + public static final String FS_PREFIX_S3A = "s3a"; + public static final String FS_PREFIX_S3N = "s3n"; + public static final String FS_PREFIX_OSS = "oss"; + public static final String FS_PREFIX_GCS = "gs"; + public static final String FS_PREFIX_BOS = "bos"; + public static final String FS_PREFIX_COS = "cos"; + public static final String FS_PREFIX_COSN = "cosn"; + public static final String FS_PREFIX_OBS = "obs"; + public static final String FS_PREFIX_OFS = "ofs"; + public static final String FS_PREFIX_GFS = "gfs"; + public static final String FS_PREFIX_JFS = "jfs"; + public static final String FS_PREFIX_HDFS = "hdfs"; + public static final String FS_PREFIX_VIEWFS = "viewfs"; + public static final String FS_PREFIX_FILE = "file"; public static final String INTERNAL_DB_NAME = "__internal_schema"; public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "internal_tmp_materialized_view_"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java new file mode 100644 index 00000000000..d56e67bb0d1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -0,0 +1,380 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; +import org.apache.doris.datasource.property.constants.CosProperties; +import org.apache.doris.datasource.property.constants.ObsProperties; +import org.apache.doris.datasource.property.constants.OssProperties; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.fs.FileSystemType; +import org.apache.doris.thrift.TFileType; + +import com.google.common.base.Strings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class LocationPath { + private static final Logger LOG = LogManager.getLogger(LocationPath.class); + private static final String SCHEME_DELIM = "://"; + private static final String NONSTANDARD_SCHEME_DELIM = ":/"; + private final LocationType locationType; + private final String location; + + enum LocationType { + HDFS, + LOCAL, // Local File + BOS, // Baidu + GCS, // Google, + OBS, // Huawei, + COS, // Tencent + COSN, // Tencent + OFS, // Tencent CHDFS + GFS, // Tencent GooseFs, + OSS, // Alibaba, + OSS_HDFS, // JindoFS on OSS + JFS, // JuiceFS, + S3, + S3A, + S3N, + VIEWFS, + UNKNOWN + } + + private LocationPath(String location) { + this(location, new HashMap<>()); + } + + public LocationPath(String location, Map<String, String> props) { + String scheme = parseScheme(location).toLowerCase(); + switch (scheme) { + case FeConstants.FS_PREFIX_HDFS: + locationType = LocationType.HDFS; + // Need add hdfs host to location + String host = props.get(HdfsResource.DSF_NAMESERVICES); + this.location = normalizedHdfsPath(location, host); + break; + case FeConstants.FS_PREFIX_S3: + locationType = LocationType.S3; + this.location = location; + break; + case FeConstants.FS_PREFIX_S3A: + locationType = LocationType.S3A; + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_S3N: + // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. + locationType = LocationType.S3N; + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_BOS: + locationType = LocationType.BOS; + // use s3 client to access + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_GCS: + locationType = LocationType.GCS; + // use s3 client to access + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_OSS: + if (isHdfsOnOssEndpoint(location)) { + locationType = LocationType.OSS_HDFS; + this.location = location; + } else { + if (useS3EndPoint(props)) { + this.location = convertToS3(location); + } else { + this.location = location; + } + locationType = LocationType.OSS; + } + break; + case FeConstants.FS_PREFIX_COS: + if (useS3EndPoint(props)) { + this.location = convertToS3(location); + } else { + this.location = location; + } + locationType = LocationType.COS; + break; + case FeConstants.FS_PREFIX_OBS: + if (useS3EndPoint(props)) { + this.location = convertToS3(location); + } else { + this.location = location; + } + locationType = LocationType.OBS; + break; + case FeConstants.FS_PREFIX_OFS: + locationType = LocationType.OFS; + this.location = location; + break; + case FeConstants.FS_PREFIX_JFS: + locationType = LocationType.JFS; + this.location = location; + break; + case FeConstants.FS_PREFIX_GFS: + locationType = LocationType.GFS; + this.location = location; + break; + case FeConstants.FS_PREFIX_COSN: + // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues + locationType = LocationType.COSN; + this.location = location; + break; + case FeConstants.FS_PREFIX_VIEWFS: + locationType = LocationType.VIEWFS; + this.location = location; + break; + case FeConstants.FS_PREFIX_FILE: + locationType = LocationType.LOCAL; + this.location = location; + break; + default: + locationType = LocationType.UNKNOWN; + this.location = location; + } + } + + private static String parseScheme(String location) { + String[] schemeSplit = location.split(SCHEME_DELIM); + if (schemeSplit.length > 1) { + return schemeSplit[0]; + } else { + schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM); + if (schemeSplit.length > 1) { + return schemeSplit[0]; + } + throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location); + } + } + + private boolean useS3EndPoint(Map<String, String> props) { + if (props.containsKey(ObsProperties.ENDPOINT) + || props.containsKey(OssProperties.ENDPOINT) + || props.containsKey(CosProperties.ENDPOINT)) { + return false; + } + // wide check range for the compatibility of s3 properties + return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT)); + } + + public static boolean isHdfsOnOssEndpoint(String location) { + // example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs". + // https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen + return location.contains("oss-dls.aliyuncs"); + } + + /** + * The converted path is used for FE to get metadata + * @param location origin location + * @return metadata location path. just convert when storage is compatible with s3 client. + */ + private static String convertToS3(String location) { + LOG.debug("try convert location to s3 prefix: " + location); + int pos = findDomainPos(location); + return "s3" + location.substring(pos); + } + + private static int findDomainPos(String rangeLocation) { + int pos = rangeLocation.indexOf("://"); + if (pos == -1) { + throw new RuntimeException("No '://' found in location: " + rangeLocation); + } + return pos; + } + + private static String normalizedHdfsPath(String location, String host) { + try { + // Hive partition may contain special characters such as ' ', '<', '>' and so on. + // Need to encode these characters before creating URI. + // But doesn't encode '/' and ':' so that we can get the correct uri host. + location = URLEncoder.encode(location, StandardCharsets.UTF_8.name()) + .replace("%2F", "/").replace("%3A", ":"); + URI normalizedUri = new URI(location); + // compatible with 'hdfs:///' or 'hdfs:/' + if (StringUtils.isEmpty(normalizedUri.getHost())) { + location = URLDecoder.decode(location, StandardCharsets.UTF_8.name()); + String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//"; + String brokenPrefix = HdfsResource.HDFS_PREFIX + "/"; + if (location.startsWith(brokenPrefix) && !location.startsWith(normalizedPrefix)) { + location = location.replace(brokenPrefix, normalizedPrefix); + } + if (StringUtils.isNotEmpty(host)) { + // Replace 'hdfs://key/' to 'hdfs://name_service/key/' + // Or hdfs:///abc to hdfs://name_service/abc + return location.replace(normalizedPrefix, normalizedPrefix + host + "/"); + } else { + // 'hdfs://null/' equals the 'hdfs:///' + if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) { + // Do not support hdfs:///location + throw new RuntimeException("Invalid location with empty host: " + location); + } else { + // Replace 'hdfs://key/' to '/key/', try access local NameNode on BE. + return location.replace(normalizedPrefix, "/"); + } + } + } + return URLDecoder.decode(location, StandardCharsets.UTF_8.name()); + } catch (URISyntaxException | UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) { + LocationPath locationPath = new LocationPath(location); + FileSystemType fsType = (bindBrokerName != null) ? FileSystemType.BROKER : locationPath.getFileSystemType(); + URI uri = locationPath.getPath().toUri(); + String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority()); + return Pair.of(fsType, fsIdent); + } + + private FileSystemType getFileSystemType() { + FileSystemType fsType; + switch (locationType) { + case S3: + case S3A: + case S3N: + case COS: + case OSS: + case OBS: + case BOS: + case GCS: + // All storage will use s3 client to access on BE, so need convert to s3 + fsType = FileSystemType.S3; + break; + case COSN: + case OFS: + // ofs:// and cosn:// use the same underlying file system: Tencent Cloud HDFS, aka CHDFS)) { + fsType = FileSystemType.OFS; + break; + case HDFS: + case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss. + case VIEWFS: + case GFS: + fsType = FileSystemType.DFS; + break; + case JFS: + fsType = FileSystemType.JFS; + break; + case LOCAL: + fsType = FileSystemType.FILE; + break; + default: + throw new UnsupportedOperationException("Unknown file system for location: " + location); + } + return fsType; + } + + /** + * provide file type for BE. + * @param location the location is from fs.listFile + * @return on BE, we will use TFileType to get the suitable client to access storage. + */ + public static TFileType getTFileType(String location) { + if (location == null || location.isEmpty()) { + return null; + } + LocationPath locationPath = new LocationPath(location); + switch (locationPath.getLocationType()) { + case S3: + case S3A: + case S3N: + case COS: + case OSS: + case OBS: + case BOS: + case GCS: + // now we only support S3 client for object storage on BE + return TFileType.FILE_S3; + case HDFS: + case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss. + case VIEWFS: + case COSN: + return TFileType.FILE_HDFS; + case GFS: + case JFS: + case OFS: + return TFileType.FILE_BROKER; + case LOCAL: + return TFileType.FILE_LOCAL; + default: + return null; + } + } + + /** + * The converted path is used for BE + * @return BE scan range path + */ + public Path toScanRangeLocation() { + switch (locationType) { + case S3: + case S3A: + case S3N: + case COS: + case OSS: + case OBS: + case BOS: + case GCS: + // All storage will use s3 client to access on BE, so need convert to s3 + return new Path(convertToS3(location)); + case HDFS: + case OSS_HDFS: + case VIEWFS: + case COSN: + case GFS: + case JFS: + case OFS: + case LOCAL: + default: + return getPath(); + } + } + + public LocationType getLocationType() { + return locationType; + } + + public String get() { + return location; + } + + public Path getPath() { + return new Path(location); + } + + @Override + public String toString() { + return get(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java index 98790bc9e83..2d40af321fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java @@ -17,18 +17,8 @@ package org.apache.doris.common.util; -import org.apache.doris.catalog.HdfsResource; -import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.credentials.CloudCredential; -import org.apache.doris.datasource.property.constants.CosProperties; -import org.apache.doris.datasource.property.constants.ObsProperties; -import org.apache.doris.datasource.property.constants.OssProperties; -import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.Path; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; @@ -43,138 +33,10 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; -import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URISyntaxException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Map; public class S3Util { - private static final Logger LOG = LogManager.getLogger(S3Util.class); - - public static boolean isObjStorage(String location) { - return isObjStorageUseS3Client(location) - // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues - || (location.startsWith(FeConstants.FS_PREFIX_COS) && !location.startsWith(FeConstants.FS_PREFIX_COSN)) - || location.startsWith(FeConstants.FS_PREFIX_OSS) - || location.startsWith(FeConstants.FS_PREFIX_OBS); - } - - private static boolean isObjStorageUseS3Client(String location) { - return location.startsWith(FeConstants.FS_PREFIX_S3) - || location.startsWith(FeConstants.FS_PREFIX_S3A) - || location.startsWith(FeConstants.FS_PREFIX_S3N) - || location.startsWith(FeConstants.FS_PREFIX_GCS) - || location.startsWith(FeConstants.FS_PREFIX_BOS); - } - - private static boolean isS3EndPoint(String location, Map<String, String> props) { - if (props.containsKey(ObsProperties.ENDPOINT) - || props.containsKey(OssProperties.ENDPOINT) - || props.containsKey(CosProperties.ENDPOINT)) { - return false; - } - // wide check range for the compatibility of s3 properties - return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT)) - && isObjStorage(location); - } - - /** - * The converted path is used for FE to get metadata - * @param location origin location - * @return metadata location path. just convert when storage is compatible with s3 client. - */ - public static String convertToS3IfNecessary(String location, Map<String, String> props) { - LOG.debug("try convert location to s3 prefix: " + location); - // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. - if (isS3EndPoint(location, props) || isObjStorageUseS3Client(location)) { - int pos = location.indexOf("://"); - if (pos == -1) { - throw new RuntimeException("No '://' found in location: " + location); - } - return "s3" + location.substring(pos); - } - return normalizedLocation(location, props); - } - - private static String normalizedLocation(String location, Map<String, String> props) { - try { - if (location.startsWith(HdfsResource.HDFS_PREFIX)) { - return normalizedHdfsPath(location, props); - } - return location; - } catch (URISyntaxException | UnsupportedEncodingException e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - private static String normalizedHdfsPath(String location, Map<String, String> props) - throws URISyntaxException, UnsupportedEncodingException { - // Hive partition may contain special characters such as ' ', '<', '>' and so on. - // Need to encode these characters before creating URI. - // But doesn't encode '/' and ':' so that we can get the correct uri host. - location = URLEncoder.encode(location, StandardCharsets.UTF_8.name()).replace("%2F", "/").replace("%3A", ":"); - URI normalizedUri = new URI(location); - // compatible with 'hdfs:///' or 'hdfs:/' - if (StringUtils.isEmpty(normalizedUri.getHost())) { - location = URLDecoder.decode(location, StandardCharsets.UTF_8.name()); - String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//"; - String brokenPrefix = HdfsResource.HDFS_PREFIX + "/"; - if (location.startsWith(brokenPrefix) && !location.startsWith(normalizedPrefix)) { - location = location.replace(brokenPrefix, normalizedPrefix); - } - // Need add hdfs host to location - String host = props.get(HdfsResource.DSF_NAMESERVICES); - if (StringUtils.isNotEmpty(host)) { - // Replace 'hdfs://key/' to 'hdfs://name_service/key/' - // Or hdfs:///abc to hdfs://name_service/abc - // TODO: check host in path when the 'dfs.nameservices' has multiple hosts - return location.replace(normalizedPrefix, normalizedPrefix + host + "/"); - } else { - // 'hdfs://null/' equals the 'hdfs:///' - if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) { - // Do not support hdfs:///location - throw new RuntimeException("Invalid location with empty host: " + location); - } else { - // Replace 'hdfs://key/' to '/key/', try access local NameNode on BE. - return location.replace(normalizedPrefix, "/"); - } - } - } - return URLDecoder.decode(location, StandardCharsets.UTF_8.name()); - } - - /** - * The converted path is used for BE - * @param location origin split path - * @return BE scan range path - */ - public static Path toScanRangeLocation(String location, Map<String, String> props) { - // All storage will use s3 client on BE. - if (isObjStorage(location)) { - int pos = location.indexOf("://"); - if (pos == -1) { - throw new RuntimeException("No '://' found in location: " + location); - } - if (isHdfsOnOssEndpoint(location)) { - // if hdfs service is enabled on oss, use oss location - // example: oss://examplebucket.cn-shanghai.oss-dls.aliyuncs.com/dir/file/0000.orc - location = "oss" + location.substring(pos); - } else { - location = "s3" + location.substring(pos); - } - } - return new Path(normalizedLocation(location, props)); - } - - public static boolean isHdfsOnOssEndpoint(String location) { - // example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs". - // https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen - return location.contains("oss-dls.aliyuncs"); - } public static S3Client buildS3Client(URI endpoint, String region, CloudCredential credential) { StaticCredentialsProvider scp; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 0ab5179ffa5..9ceb0066d30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -34,14 +34,13 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.CacheBulkLoader; -import org.apache.doris.common.util.S3Util; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.external.hive.util.HiveUtil; import org.apache.doris.fs.FileSystemCache; -import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.RemoteFiles; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -361,7 +360,7 @@ public class HiveMetaStoreCache { String bindBrokerName) throws UserException { FileCacheValue result = new FileCacheValue(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity( + new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( location, bindBrokerName), jobConf, bindBrokerName)); result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf)); try { @@ -374,9 +373,10 @@ public class HiveMetaStoreCache { // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, true); for (RemoteFile remoteFile : locatedFiles.files()) { - Path srcPath = remoteFile.getPath(); - Path convertedPath = S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties()); - if (!convertedPath.toString().equals(srcPath.toString())) { + String srcPath = remoteFile.getPath().toString(); + LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties()); + Path convertedPath = locationPath.toScanRangeLocation(); + if (!convertedPath.toString().equals(srcPath)) { remoteFile.setPath(convertedPath); } result.addFile(remoteFile); @@ -400,13 +400,12 @@ public class HiveMetaStoreCache { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); - String finalLocation = S3Util.convertToS3IfNecessary(key.location, - catalog.getCatalogProperty().getProperties()); + Map<String, String> props = catalog.getCatalogProperty().getProperties(); + LocationPath finalLocation = new LocationPath(key.location, props); // disable the fs cache in FileSystem, or it will always from new FileSystem // and save it in cache when calling FileInputFormat.setInputPaths(). try { - Path path = new Path(finalLocation); - URI uri = path.toUri(); + URI uri = finalLocation.getPath().toUri(); if (uri.getScheme() != null) { String scheme = uri.getScheme(); updateJobConf("fs." + scheme + ".impl.disable.cache", "true"); @@ -419,13 +418,13 @@ public class HiveMetaStoreCache { } catch (Exception e) { LOG.warn("unknown scheme in path: " + finalLocation, e); } - FileInputFormat.setInputPaths(jobConf, finalLocation); + FileInputFormat.setInputPaths(jobConf, finalLocation.get()); try { FileCacheValue result; InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = getFileCache(finalLocation, inputFormat, jobConf, + result = getFileCache(finalLocation.get(), inputFormat, jobConf, key.getPartitionValues(), key.bindBrokerName); } else { InputSplit[] splits; @@ -442,8 +441,9 @@ public class HiveMetaStoreCache { for (int i = 0; i < splits.length; i++) { org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]); // todo: get modification time - Path splitFilePath = S3Util.toScanRangeLocation(fs.getPath().toString(), - catalog.getProperties()); + String dataFilePath = fs.getPath().toString(); + LocationPath locationPath = new LocationPath(dataFilePath, catalog.getProperties()); + Path splitFilePath = locationPath.toScanRangeLocation(); result.addSplit(new FileSplit(splitFilePath, fs.getStart(), fs.getLength(), -1, null, null)); } } @@ -812,7 +812,7 @@ public class HiveMetaStoreCache { String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(), + LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), bindBrokerName), jobConf, bindBrokerName)); Status status = fs.exists(acidVersionPath); if (status != Status.OK) { @@ -835,7 +835,7 @@ public class HiveMetaStoreCache { String location = delta.getPath().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getFSIdentity(location, bindBrokerName), + LocationPath.getFSIdentity(location, bindBrokerName), jobConf, bindBrokerName)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); if (delta.isDeleteDelta()) { @@ -855,7 +855,7 @@ public class HiveMetaStoreCache { String location = directory.getBaseDirectory().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getFSIdentity(location, bindBrokerName), + LocationPath.getFSIdentity(location, bindBrokerName), jobConf, bindBrokerName)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); locatedFiles.files().stream().filter( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java index 1b0e3b6d972..a6cad308839 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java @@ -17,7 +17,7 @@ package org.apache.doris.datasource.property; -import org.apache.doris.common.util.S3Util; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.InitCatalogLog.Type; @@ -301,7 +301,7 @@ public class PropertyConverter { ossProperties.put("fs.oss.impl.disable.cache", "true"); ossProperties.put("fs.oss.impl", getHadoopFSImplByScheme("oss")); boolean hdfsEnabled = Boolean.parseBoolean(props.getOrDefault(OssProperties.OSS_HDFS_ENABLED, "false")); - if (S3Util.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) { + if (LocationPath.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) { // use endpoint or enable hdfs rewriteHdfsOnOssProperties(ossProperties, endpoint); } @@ -321,7 +321,7 @@ public class PropertyConverter { } private static void rewriteHdfsOnOssProperties(Map<String, String> ossProperties, String endpoint) { - if (!S3Util.isHdfsOnOssEndpoint(endpoint)) { + if (!LocationPath.isHdfsOnOssEndpoint(endpoint)) { // just for robustness here, avoid wrong endpoint when oss-hdfs is enabled. // convert "oss-cn-beijing.aliyuncs.com" to "cn-beijing.oss-dls.aliyuncs.com" // reference link: https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java index e54a73bbff3..63f552a8ab8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -18,9 +18,6 @@ package org.apache.doris.fs; import org.apache.doris.analysis.StorageBackend; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.Pair; -import org.apache.doris.common.util.S3Util; import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.S3FileSystem; @@ -28,12 +25,10 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.fs.remote.dfs.JFSFileSystem; import org.apache.doris.fs.remote.dfs.OFSFileSystem; -import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -56,35 +51,6 @@ public class FileSystemFactory { } } - public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) { - FileSystemType fsType; - if (bindBrokerName != null) { - fsType = FileSystemType.BROKER; - } else if (S3Util.isObjStorage(location)) { - if (S3Util.isHdfsOnOssEndpoint(location)) { - // if hdfs service is enabled on oss, use hdfs lib to access oss. - fsType = FileSystemType.DFS; - } else { - fsType = FileSystemType.S3; - } - } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || location.startsWith(FeConstants.FS_PREFIX_GFS) - || location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) { - fsType = FileSystemType.DFS; - } else if (location.startsWith(FeConstants.FS_PREFIX_OFS) || location.startsWith(FeConstants.FS_PREFIX_COSN)) { - // ofs:// and cosn:// use the same underlying file system: Tencent Cloud HDFS, aka CHDFS)) { - fsType = FileSystemType.OFS; - } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) { - fsType = FileSystemType.JFS; - } else { - throw new UnsupportedOperationException("Unknown file system for location: " + location); - } - - Path path = new Path(location); - URI uri = path.toUri(); - String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority()); - return Pair.of(fsType, fsIdent); - } - public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Configuration conf, String bindBrokerName) { Map<String, String> properties = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index eac945ced35..ada9f1fda61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -30,11 +30,9 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; -import org.apache.doris.common.util.S3Util; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; @@ -83,7 +81,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; /** @@ -492,33 +489,4 @@ public abstract class FileQueryScanNode extends FileScanNode { protected abstract TableIf getTargetTable() throws UserException; protected abstract Map<String, String> getLocationProperties() throws UserException; - - protected static Optional<TFileType> getTFileType(String location) { - if (location != null && !location.isEmpty()) { - if (S3Util.isObjStorage(location)) { - if (S3Util.isHdfsOnOssEndpoint(location)) { - // if hdfs service is enabled on oss, use hdfs lib to access oss. - return Optional.of(TFileType.FILE_HDFS); - } - return Optional.of(TFileType.FILE_S3); - } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) { - return Optional.of(TFileType.FILE_HDFS); - } else if (location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) { - return Optional.of(TFileType.FILE_HDFS); - } else if (location.startsWith(FeConstants.FS_PREFIX_COSN)) { - return Optional.of(TFileType.FILE_HDFS); - } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) { - return Optional.of(TFileType.FILE_LOCAL); - } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) { - return Optional.of(TFileType.FILE_BROKER); - } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) { - return Optional.of(TFileType.FILE_BROKER); - } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) { - return Optional.of(TFileType.FILE_BROKER); - } - } - return Optional.empty(); - } } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 943d30017e7..fbb7df4df2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -32,6 +32,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -66,6 +67,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.stream.Collectors; @@ -334,7 +336,7 @@ public class HiveScanNode extends FileQueryScanNode { if (bindBrokerName != null) { return TFileType.FILE_BROKER; } - return getTFileType(location).orElseThrow(() -> + return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() -> new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index 3921eb5cb89..f73947262e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -26,7 +26,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.S3Util; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; @@ -43,7 +43,6 @@ import org.apache.doris.thrift.THudiFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -287,9 +286,11 @@ public class HudiScanNode extends HiveScanNode { noLogsSplitNum.incrementAndGet(); String filePath = baseFile.getPath(); long fileSize = baseFile.getFileSize(); - splits.add(new FileSplit(S3Util.toScanRangeLocation(filePath, Maps.newHashMap()), - 0, fileSize, fileSize, new String[0], - partition.getPartitionValues())); + // Need add hdfs host to location + LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); + Path splitFilePath = locationPath.toScanRangeLocation(); + splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, + new String[0], partition.getPartitionValues())); }); } else { fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index c8a9437e243..0602da4a34d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -31,7 +31,7 @@ import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.catalog.external.IcebergExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.S3Util; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.external.iceberg.util.IcebergUtils; @@ -141,7 +141,9 @@ public class IcebergScanNode extends FileQueryScanNode { for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) { TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc(); String deleteFilePath = filter.getDeleteFilePath(); - deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath, icebergSplit.getConfig()).toString()); + LocationPath locationPath = new LocationPath(deleteFilePath, icebergSplit.getConfig()); + Path splitDeletePath = locationPath.toScanRangeLocation(); + deleteFileDesc.setPath(splitDeletePath.toString()); if (filter instanceof IcebergDeleteFileFilter.PositionDelete) { fileDesc.setContent(FileContent.POSITION_DELETES.id()); IcebergDeleteFileFilter.PositionDelete positionDelete = @@ -221,8 +223,8 @@ public class IcebergScanNode extends FileQueryScanNode { // Counts the number of partitions read partitionPathSet.add(structLike.toString()); } - - Path finalDataFilePath = S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties()); + LocationPath locationPath = new LocationPath(dataFilePath, source.getCatalog().getProperties()); + Path finalDataFilePath = locationPath.toScanRangeLocation(); IcebergSplit split = new IcebergSplit( finalDataFilePath, splitTask.start(), @@ -323,7 +325,7 @@ public class IcebergScanNode extends FileQueryScanNode { @Override public TFileType getLocationType(String location) throws UserException { final String fLocation = normalizeLocation(location); - return getTFileType(fLocation).orElseThrow(() -> + return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() -> new DdlException("Unknown file location " + fLocation + " for iceberg table " + icebergTable.name())); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java new file mode 100644 index 00000000000..277b6527a4f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.fs.FileSystemType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class LocationPathTest { + + @Test + public void testHdfsLocationConvert() { + // non HA + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("hdfs://dir/file.path", rangeProps); + Assertions.assertTrue(locationPath.get().startsWith("hdfs://")); + + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("hdfs://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS); + + // HA props + Map<String, String> props = new HashMap<>(); + props.put("dfs.nameservices", "ns"); + locationPath = new LocationPath("hdfs:///dir/file.path", props); + Assertions.assertTrue(locationPath.get().startsWith("hdfs://") + && !locationPath.get().startsWith("hdfs:///")); + + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("hdfs://") && !beLocation.startsWith("hdfs:///")); + + // nonstandard '/' for hdfs path + locationPath = new LocationPath("hdfs:/dir/file.path", props); + Assertions.assertTrue(locationPath.get().startsWith("hdfs://")); + + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("hdfs://")); + + // empty ha nameservices + props.put("dfs.nameservices", ""); + locationPath = new LocationPath("hdfs:/dir/file.path", props); + + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(locationPath.get().startsWith("/dir") + && !locationPath.get().startsWith("hdfs://")); + Assertions.assertTrue(beLocation.startsWith("/dir") && !beLocation.startsWith("hdfs://")); + } + + + @Test + public void testJFSLocationConvert() { + String loc; + Map<String, String> rangeProps = new HashMap<>(); + + LocationPath locationPath = new LocationPath("jfs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("jfs://")); + // BE + loc = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(loc.startsWith("jfs://")); + Assertions.assertEquals(LocationPath.getFSIdentity(loc, null).first, FileSystemType.JFS); + } + + @Test + public void testGSLocationConvert() { + Map<String, String> rangeProps = new HashMap<>(); + + // use s3 client to access gs + LocationPath locationPath = new LocationPath("gs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("s3://")); + // BE + String beLoc = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLoc.startsWith("s3://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, null).first, FileSystemType.S3); + } + + @Test + public void testOSSLocationConvert() { + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("oss://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("oss://")); + // BE + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("s3://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); + + locationPath = new LocationPath("oss://test.oss-dls.aliyuncs.com/path", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs")); + // BE + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS); + + } + + @Test + public void testCOSLocationConvert() { + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("cos://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("cos://")); + String beLocation = locationPath.toScanRangeLocation().toString(); + // BE + Assertions.assertTrue(beLocation.startsWith("s3://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); + + locationPath = new LocationPath("cosn://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("cosn://")); + // BE + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("cosn://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.OFS); + + locationPath = new LocationPath("ofs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("ofs://")); + // BE + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("ofs://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.OFS); + + // GFS is now equals to DFS + locationPath = new LocationPath("gfs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("gfs://")); + // BE + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("gfs://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS); + } + + @Test + public void testOBSLocationConvert() { + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("obs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("obs://")); + // BE + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("s3://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); + } + + @Test + public void testUnsupportedLocationConvert() { + // when use unknown location, pass to BE + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("unknown://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("unknown://")); + // BE + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("unknown://")); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java deleted file mode 100644 index 70bad23e01f..00000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -import org.apache.doris.fs.FileSystemFactory; -import org.apache.doris.fs.FileSystemType; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; - -public class S3UtilTest { - @Test - public void testLocationConvert() { - String loc; - loc = S3Util.convertToS3IfNecessary("hdfs://dir/file.path", new HashMap<>()); - Assertions.assertTrue(loc.startsWith("hdfs://")); - - Map<String, String> props = new HashMap<>(); - props.put("dfs.nameservices", "ns"); - loc = S3Util.convertToS3IfNecessary("hdfs:///dir/file.path", props); - Assertions.assertTrue(loc.startsWith("hdfs://") && !loc.startsWith("hdfs:///")); - loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props); - Assertions.assertTrue(loc.startsWith("hdfs://")); - props.put("dfs.nameservices", ""); - loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props); - Assertions.assertTrue(loc.startsWith("/dir") && !loc.startsWith("hdfs://")); - - loc = S3Util.convertToS3IfNecessary("oss://test.com", props); - Assertions.assertTrue(loc.startsWith("oss://")); - - loc = S3Util.convertToS3IfNecessary("gcs://test.com", props); - Assertions.assertTrue(loc.startsWith("gcs://")); - - loc = S3Util.convertToS3IfNecessary("cos://test.com", props); - Assertions.assertTrue(loc.startsWith("cos://")); - - loc = S3Util.convertToS3IfNecessary("cosn://test.com", props); - Assertions.assertTrue(loc.startsWith("cosn://")); - - loc = S3Util.convertToS3IfNecessary("obs://test.com", props); - Assertions.assertTrue(loc.startsWith("obs://")); - } - - - @Test - public void testScanRangeLocationConvert() throws Exception { - String loc; - Map<String, String> rangeProps = new HashMap<>(); - loc = S3Util.toScanRangeLocation("hdfs://dir/file.path", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("hdfs://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.DFS); - - Map<String, String> props = new HashMap<>(); - props.put("dfs.nameservices", "ns"); - loc = S3Util.toScanRangeLocation("hdfs:///dir/file.path", props).toString(); - Assertions.assertTrue(loc.startsWith("hdfs://") && !loc.startsWith("hdfs:///")); - loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path", props).toString(); - Assertions.assertTrue(loc.startsWith("hdfs://")); - props.put("dfs.nameservices", ""); - loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path", props).toString(); - Assertions.assertTrue(loc.startsWith("/dir") && !loc.startsWith("hdfs://")); - - loc = S3Util.toScanRangeLocation("oss://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("s3://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.S3); - - loc = S3Util.toScanRangeLocation("oss://test.oss-dls.aliyuncs.com/path", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("oss://test.oss-dls.aliyuncs")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.DFS); - - loc = S3Util.toScanRangeLocation("cos://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("s3://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.S3); - - loc = S3Util.toScanRangeLocation("cosn://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("cosn://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.OFS); - - loc = S3Util.toScanRangeLocation("obs://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("s3://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.S3); - - loc = S3Util.toScanRangeLocation("unknown://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("unknown://")); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org