This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 944c63c4e74 [fix](cosn) use s3 client to read cosn on BE side (#30835) (#30881) 944c63c4e74 is described below commit 944c63c4e74cef9af31ab5aa4b35c28ada427263 Author: Mingyu Chen <morning...@163.com> AuthorDate: Tue Feb 6 09:57:07 2024 +0800 [fix](cosn) use s3 client to read cosn on BE side (#30835) (#30881) bp #30835 #27874 --- .../org/apache/doris/common/util/LocationPath.java | 8 +- .../java/org/apache/doris/common/util/S3Util.java | 138 ---------------- .../doris/datasource/hive/HiveMetaStoreCache.java | 34 ++-- .../datasource/property/PropertyConverter.java | 6 +- .../org/apache/doris/fs/FileSystemFactory.java | 34 ---- .../doris/planner/external/FileQueryScanNode.java | 32 ---- .../doris/planner/external/HiveScanNode.java | 6 +- .../doris/planner/external/hudi/HudiScanNode.java | 11 +- .../planner/external/iceberg/IcebergScanNode.java | 12 +- .../planner/external/paimon/PaimonScanNode.java | 4 +- .../apache/doris/common/util/LocationPathTest.java | 178 +++++++++++++++++++++ .../org/apache/doris/common/util/S3UtilTest.java | 104 ------------ 12 files changed, 222 insertions(+), 345 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index d56e67bb0d1..0ddba406cdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -297,10 +297,11 @@ public class LocationPath { /** * provide file type for BE. + * * @param location the location is from fs.listFile * @return on BE, we will use TFileType to get the suitable client to access storage. */ - public static TFileType getTFileType(String location) { + public static TFileType getTFileTypeForBE(String location) { if (location == null || location.isEmpty()) { return null; } @@ -314,12 +315,13 @@ public class LocationPath { case OBS: case BOS: case GCS: + // ATTN, for COSN, on FE side, use HadoopFS to access, but on BE, use S3 client to access. + case COSN: // now we only support S3 client for object storage on BE return TFileType.FILE_S3; case HDFS: case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss. case VIEWFS: - case COSN: return TFileType.FILE_HDFS; case GFS: case JFS: @@ -346,12 +348,12 @@ public class LocationPath { case OBS: case BOS: case GCS: + case COSN: // All storage will use s3 client to access on BE, so need convert to s3 return new Path(convertToS3(location)); case HDFS: case OSS_HDFS: case VIEWFS: - case COSN: case GFS: case JFS: case OFS: diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java index 98790bc9e83..2d40af321fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java @@ -17,18 +17,8 @@ package org.apache.doris.common.util; -import org.apache.doris.catalog.HdfsResource; -import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.credentials.CloudCredential; -import org.apache.doris.datasource.property.constants.CosProperties; -import org.apache.doris.datasource.property.constants.ObsProperties; -import org.apache.doris.datasource.property.constants.OssProperties; -import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.Path; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; @@ -43,138 +33,10 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; -import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URISyntaxException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Map; public class S3Util { - private static final Logger LOG = LogManager.getLogger(S3Util.class); - - public static boolean isObjStorage(String location) { - return isObjStorageUseS3Client(location) - // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues - || (location.startsWith(FeConstants.FS_PREFIX_COS) && !location.startsWith(FeConstants.FS_PREFIX_COSN)) - || location.startsWith(FeConstants.FS_PREFIX_OSS) - || location.startsWith(FeConstants.FS_PREFIX_OBS); - } - - private static boolean isObjStorageUseS3Client(String location) { - return location.startsWith(FeConstants.FS_PREFIX_S3) - || location.startsWith(FeConstants.FS_PREFIX_S3A) - || location.startsWith(FeConstants.FS_PREFIX_S3N) - || location.startsWith(FeConstants.FS_PREFIX_GCS) - || location.startsWith(FeConstants.FS_PREFIX_BOS); - } - - private static boolean isS3EndPoint(String location, Map<String, String> props) { - if (props.containsKey(ObsProperties.ENDPOINT) - || props.containsKey(OssProperties.ENDPOINT) - || props.containsKey(CosProperties.ENDPOINT)) { - return false; - } - // wide check range for the compatibility of s3 properties - return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT)) - && isObjStorage(location); - } - - /** - * The converted path is used for FE to get metadata - * @param location origin location - * @return metadata location path. just convert when storage is compatible with s3 client. - */ - public static String convertToS3IfNecessary(String location, Map<String, String> props) { - LOG.debug("try convert location to s3 prefix: " + location); - // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. - if (isS3EndPoint(location, props) || isObjStorageUseS3Client(location)) { - int pos = location.indexOf("://"); - if (pos == -1) { - throw new RuntimeException("No '://' found in location: " + location); - } - return "s3" + location.substring(pos); - } - return normalizedLocation(location, props); - } - - private static String normalizedLocation(String location, Map<String, String> props) { - try { - if (location.startsWith(HdfsResource.HDFS_PREFIX)) { - return normalizedHdfsPath(location, props); - } - return location; - } catch (URISyntaxException | UnsupportedEncodingException e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - private static String normalizedHdfsPath(String location, Map<String, String> props) - throws URISyntaxException, UnsupportedEncodingException { - // Hive partition may contain special characters such as ' ', '<', '>' and so on. - // Need to encode these characters before creating URI. - // But doesn't encode '/' and ':' so that we can get the correct uri host. - location = URLEncoder.encode(location, StandardCharsets.UTF_8.name()).replace("%2F", "/").replace("%3A", ":"); - URI normalizedUri = new URI(location); - // compatible with 'hdfs:///' or 'hdfs:/' - if (StringUtils.isEmpty(normalizedUri.getHost())) { - location = URLDecoder.decode(location, StandardCharsets.UTF_8.name()); - String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//"; - String brokenPrefix = HdfsResource.HDFS_PREFIX + "/"; - if (location.startsWith(brokenPrefix) && !location.startsWith(normalizedPrefix)) { - location = location.replace(brokenPrefix, normalizedPrefix); - } - // Need add hdfs host to location - String host = props.get(HdfsResource.DSF_NAMESERVICES); - if (StringUtils.isNotEmpty(host)) { - // Replace 'hdfs://key/' to 'hdfs://name_service/key/' - // Or hdfs:///abc to hdfs://name_service/abc - // TODO: check host in path when the 'dfs.nameservices' has multiple hosts - return location.replace(normalizedPrefix, normalizedPrefix + host + "/"); - } else { - // 'hdfs://null/' equals the 'hdfs:///' - if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) { - // Do not support hdfs:///location - throw new RuntimeException("Invalid location with empty host: " + location); - } else { - // Replace 'hdfs://key/' to '/key/', try access local NameNode on BE. - return location.replace(normalizedPrefix, "/"); - } - } - } - return URLDecoder.decode(location, StandardCharsets.UTF_8.name()); - } - - /** - * The converted path is used for BE - * @param location origin split path - * @return BE scan range path - */ - public static Path toScanRangeLocation(String location, Map<String, String> props) { - // All storage will use s3 client on BE. - if (isObjStorage(location)) { - int pos = location.indexOf("://"); - if (pos == -1) { - throw new RuntimeException("No '://' found in location: " + location); - } - if (isHdfsOnOssEndpoint(location)) { - // if hdfs service is enabled on oss, use oss location - // example: oss://examplebucket.cn-shanghai.oss-dls.aliyuncs.com/dir/file/0000.orc - location = "oss" + location.substring(pos); - } else { - location = "s3" + location.substring(pos); - } - } - return new Path(normalizedLocation(location, props)); - } - - public static boolean isHdfsOnOssEndpoint(String location) { - // example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs". - // https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen - return location.contains("oss-dls.aliyuncs"); - } public static S3Client buildS3Client(URI endpoint, String region, CloudCredential credential) { StaticCredentialsProvider scp; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 72f413b5dec..3e98b887eeb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -34,14 +34,13 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.CacheBulkLoader; -import org.apache.doris.common.util.S3Util; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.external.hive.util.HiveUtil; import org.apache.doris.fs.FileSystemCache; -import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.RemoteFiles; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; @@ -361,7 +360,7 @@ public class HiveMetaStoreCache { String bindBrokerName) throws UserException { FileCacheValue result = new FileCacheValue(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity( + new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( location, bindBrokerName), jobConf, bindBrokerName)); result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf)); try { @@ -374,9 +373,10 @@ public class HiveMetaStoreCache { // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, true); for (RemoteFile remoteFile : locatedFiles.files()) { - Path srcPath = remoteFile.getPath(); - Path convertedPath = S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties()); - if (!convertedPath.toString().equals(srcPath.toString())) { + String srcPath = remoteFile.getPath().toString(); + LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties()); + Path convertedPath = locationPath.toScanRangeLocation(); + if (!convertedPath.toString().equals(srcPath)) { remoteFile.setPath(convertedPath); } result.addFile(remoteFile); @@ -400,13 +400,12 @@ public class HiveMetaStoreCache { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); - String finalLocation = S3Util.convertToS3IfNecessary(key.location, - catalog.getCatalogProperty().getProperties()); + Map<String, String> props = catalog.getCatalogProperty().getProperties(); + LocationPath finalLocation = new LocationPath(key.location, props); // disable the fs cache in FileSystem, or it will always from new FileSystem // and save it in cache when calling FileInputFormat.setInputPaths(). try { - Path path = new Path(finalLocation); - URI uri = path.toUri(); + URI uri = finalLocation.getPath().toUri(); if (uri.getScheme() != null) { String scheme = uri.getScheme(); updateJobConf("fs." + scheme + ".impl.disable.cache", "true"); @@ -419,13 +418,13 @@ public class HiveMetaStoreCache { } catch (Exception e) { LOG.warn("unknown scheme in path: " + finalLocation, e); } - FileInputFormat.setInputPaths(jobConf, finalLocation); + FileInputFormat.setInputPaths(jobConf, finalLocation.get()); try { FileCacheValue result; InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = getFileCache(finalLocation, inputFormat, jobConf, + result = getFileCache(finalLocation.get(), inputFormat, jobConf, key.getPartitionValues(), key.bindBrokerName); } else { InputSplit[] splits; @@ -442,8 +441,9 @@ public class HiveMetaStoreCache { for (int i = 0; i < splits.length; i++) { org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]); // todo: get modification time - Path splitFilePath = S3Util.toScanRangeLocation(fs.getPath().toString(), - catalog.getProperties()); + String dataFilePath = fs.getPath().toString(); + LocationPath locationPath = new LocationPath(dataFilePath, catalog.getProperties()); + Path splitFilePath = locationPath.toScanRangeLocation(); result.addSplit(new FileSplit(splitFilePath, fs.getStart(), fs.getLength(), -1, null, null)); } } @@ -815,7 +815,7 @@ public class HiveMetaStoreCache { String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(), + LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), bindBrokerName), jobConf, bindBrokerName)); Status status = fs.exists(acidVersionPath); if (status != Status.OK) { @@ -838,7 +838,7 @@ public class HiveMetaStoreCache { String location = delta.getPath().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getFSIdentity(location, bindBrokerName), + LocationPath.getFSIdentity(location, bindBrokerName), jobConf, bindBrokerName)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); if (delta.isDeleteDelta()) { @@ -858,7 +858,7 @@ public class HiveMetaStoreCache { String location = directory.getBaseDirectory().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getFSIdentity(location, bindBrokerName), + LocationPath.getFSIdentity(location, bindBrokerName), jobConf, bindBrokerName)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); locatedFiles.files().stream().filter( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java index 39b68dda144..66396e2c337 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java @@ -17,7 +17,7 @@ package org.apache.doris.datasource.property; -import org.apache.doris.common.util.S3Util; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.InitCatalogLog.Type; @@ -301,7 +301,7 @@ public class PropertyConverter { ossProperties.put("fs.oss.impl.disable.cache", "true"); ossProperties.put("fs.oss.impl", getHadoopFSImplByScheme("oss")); boolean hdfsEnabled = Boolean.parseBoolean(props.getOrDefault(OssProperties.OSS_HDFS_ENABLED, "false")); - if (S3Util.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) { + if (LocationPath.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) { // use endpoint or enable hdfs rewriteHdfsOnOssProperties(ossProperties, endpoint); } @@ -321,7 +321,7 @@ public class PropertyConverter { } private static void rewriteHdfsOnOssProperties(Map<String, String> ossProperties, String endpoint) { - if (!S3Util.isHdfsOnOssEndpoint(endpoint)) { + if (!LocationPath.isHdfsOnOssEndpoint(endpoint)) { // just for robustness here, avoid wrong endpoint when oss-hdfs is enabled. // convert "oss-cn-beijing.aliyuncs.com" to "cn-beijing.oss-dls.aliyuncs.com" // reference link: https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java index e54a73bbff3..63f552a8ab8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -18,9 +18,6 @@ package org.apache.doris.fs; import org.apache.doris.analysis.StorageBackend; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.Pair; -import org.apache.doris.common.util.S3Util; import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.S3FileSystem; @@ -28,12 +25,10 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.fs.remote.dfs.JFSFileSystem; import org.apache.doris.fs.remote.dfs.OFSFileSystem; -import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -56,35 +51,6 @@ public class FileSystemFactory { } } - public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) { - FileSystemType fsType; - if (bindBrokerName != null) { - fsType = FileSystemType.BROKER; - } else if (S3Util.isObjStorage(location)) { - if (S3Util.isHdfsOnOssEndpoint(location)) { - // if hdfs service is enabled on oss, use hdfs lib to access oss. - fsType = FileSystemType.DFS; - } else { - fsType = FileSystemType.S3; - } - } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || location.startsWith(FeConstants.FS_PREFIX_GFS) - || location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) { - fsType = FileSystemType.DFS; - } else if (location.startsWith(FeConstants.FS_PREFIX_OFS) || location.startsWith(FeConstants.FS_PREFIX_COSN)) { - // ofs:// and cosn:// use the same underlying file system: Tencent Cloud HDFS, aka CHDFS)) { - fsType = FileSystemType.OFS; - } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) { - fsType = FileSystemType.JFS; - } else { - throw new UnsupportedOperationException("Unknown file system for location: " + location); - } - - Path path = new Path(location); - URI uri = path.toUri(); - String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority()); - return Pair.of(fsType, fsIdent); - } - public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Configuration conf, String bindBrokerName) { Map<String, String> properties = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index c2cc969cd6d..0293eac192f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -29,11 +29,9 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; -import org.apache.doris.common.util.S3Util; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; @@ -75,7 +73,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; /** @@ -452,33 +449,4 @@ public abstract class FileQueryScanNode extends FileScanNode { protected abstract TableIf getTargetTable() throws UserException; protected abstract Map<String, String> getLocationProperties() throws UserException; - - protected static Optional<TFileType> getTFileType(String location) { - if (location != null && !location.isEmpty()) { - if (S3Util.isObjStorage(location)) { - if (S3Util.isHdfsOnOssEndpoint(location)) { - // if hdfs service is enabled on oss, use hdfs lib to access oss. - return Optional.of(TFileType.FILE_HDFS); - } - return Optional.of(TFileType.FILE_S3); - } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) { - return Optional.of(TFileType.FILE_HDFS); - } else if (location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) { - return Optional.of(TFileType.FILE_HDFS); - } else if (location.startsWith(FeConstants.FS_PREFIX_COSN)) { - return Optional.of(TFileType.FILE_HDFS); - } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) { - return Optional.of(TFileType.FILE_LOCAL); - } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) { - return Optional.of(TFileType.FILE_BROKER); - } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) { - return Optional.of(TFileType.FILE_BROKER); - } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) { - return Optional.of(TFileType.FILE_BROKER); - } - } - return Optional.empty(); - } } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 58b93112477..0e1f3438c4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -32,6 +32,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -66,6 +67,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.stream.Collectors; @@ -337,8 +339,8 @@ public class HiveScanNode extends FileQueryScanNode { if (bindBrokerName != null) { return TFileType.FILE_BROKER; } - return getTFileType(location).orElseThrow(() -> - new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName())); + return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() -> + new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName())); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index 4da4e85453b..9d601e71daa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -26,7 +26,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.S3Util; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; @@ -43,7 +43,6 @@ import org.apache.doris.thrift.THudiFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -294,9 +293,11 @@ public class HudiScanNode extends HiveScanNode { noLogsSplitNum.incrementAndGet(); String filePath = baseFile.getPath(); long fileSize = baseFile.getFileSize(); - splits.add(new FileSplit(S3Util.toScanRangeLocation(filePath, Maps.newHashMap()), - 0, fileSize, fileSize, new String[0], - partition.getPartitionValues())); + // Need add hdfs host to location + LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); + Path splitFilePath = locationPath.toScanRangeLocation(); + splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, + new String[0], partition.getPartitionValues())); }); } else { fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index eacf372e4fc..d88d18d91af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -31,7 +31,7 @@ import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.catalog.external.IcebergExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.S3Util; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.external.iceberg.util.IcebergUtils; @@ -151,7 +151,9 @@ public class IcebergScanNode extends FileQueryScanNode { for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) { TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc(); String deleteFilePath = filter.getDeleteFilePath(); - deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath, icebergSplit.getConfig()).toString()); + LocationPath locationPath = new LocationPath(deleteFilePath, icebergSplit.getConfig()); + Path splitDeletePath = locationPath.toScanRangeLocation(); + deleteFileDesc.setPath(splitDeletePath.toString()); if (filter instanceof IcebergDeleteFileFilter.PositionDelete) { fileDesc.setContent(FileContent.POSITION_DELETES.id()); IcebergDeleteFileFilter.PositionDelete positionDelete = @@ -243,8 +245,8 @@ public class IcebergScanNode extends FileQueryScanNode { // Counts the number of partitions read partitionPathSet.add(structLike.toString()); } - - Path finalDataFilePath = S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties()); + LocationPath locationPath = new LocationPath(dataFilePath, source.getCatalog().getProperties()); + Path finalDataFilePath = locationPath.toScanRangeLocation(); IcebergSplit split = new IcebergSplit( finalDataFilePath, splitTask.start(), @@ -345,7 +347,7 @@ public class IcebergScanNode extends FileQueryScanNode { @Override public TFileType getLocationType(String location) throws UserException { final String fLocation = normalizeLocation(location); - return getTFileType(fLocation).orElseThrow(() -> + return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() -> new DdlException("Unknown file location " + fLocation + " for iceberg table " + icebergTable.name())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java index cf5edb3b914..07de5aeda79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java @@ -213,8 +213,8 @@ public class PaimonScanNode extends FileQueryScanNode { @Override public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException { - return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() -> - new DdlException("Unknown file location " + location + " for paimon table ")); + return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() -> + new DdlException("Unknown file location " + location + " for paimon table ")); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java new file mode 100644 index 00000000000..71ee9100ffc --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.fs.FileSystemType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +public class LocationPathTest { + + @Test + public void testHdfsLocationConvert() { + // non HA + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("hdfs://dir/file.path", rangeProps); + Assertions.assertTrue(locationPath.get().startsWith("hdfs://")); + + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("hdfs://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS); + + // HA props + Map<String, String> props = new HashMap<>(); + props.put("dfs.nameservices", "ns"); + locationPath = new LocationPath("hdfs:///dir/file.path", props); + Assertions.assertTrue(locationPath.get().startsWith("hdfs://") + && !locationPath.get().startsWith("hdfs:///")); + + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("hdfs://") && !beLocation.startsWith("hdfs:///")); + + // nonstandard '/' for hdfs path + locationPath = new LocationPath("hdfs:/dir/file.path", props); + Assertions.assertTrue(locationPath.get().startsWith("hdfs://")); + + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("hdfs://")); + + // empty ha nameservices + props.put("dfs.nameservices", ""); + locationPath = new LocationPath("hdfs:/dir/file.path", props); + + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(locationPath.get().startsWith("/dir") + && !locationPath.get().startsWith("hdfs://")); + Assertions.assertTrue(beLocation.startsWith("/dir") && !beLocation.startsWith("hdfs://")); + } + + + @Test + public void testJFSLocationConvert() { + String loc; + Map<String, String> rangeProps = new HashMap<>(); + + LocationPath locationPath = new LocationPath("jfs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("jfs://")); + // BE + loc = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(loc.startsWith("jfs://")); + Assertions.assertEquals(LocationPath.getFSIdentity(loc, null).first, FileSystemType.JFS); + } + + @Test + public void testGSLocationConvert() { + Map<String, String> rangeProps = new HashMap<>(); + + // use s3 client to access gs + LocationPath locationPath = new LocationPath("gs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("s3://")); + // BE + String beLoc = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLoc.startsWith("s3://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, null).first, FileSystemType.S3); + } + + @Test + public void testOSSLocationConvert() { + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("oss://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("oss://")); + // BE + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("s3://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); + + locationPath = new LocationPath("oss://test.oss-dls.aliyuncs.com/path", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs")); + // BE + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS); + + } + + @Test + public void testCOSLocationConvert() { + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("cos://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("cos://")); + String beLocation = locationPath.toScanRangeLocation().toString(); + // BE + Assertions.assertTrue(beLocation.startsWith("s3://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); + + locationPath = new LocationPath("cosn://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("cosn://")); + // BE + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("s3://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); + + locationPath = new LocationPath("ofs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("ofs://")); + // BE + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("ofs://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.OFS); + + // GFS is now equals to DFS + locationPath = new LocationPath("gfs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("gfs://")); + // BE + beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("gfs://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS); + } + + @Test + public void testOBSLocationConvert() { + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("obs://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("obs://")); + // BE + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("s3://")); + Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); + } + + @Test + public void testUnsupportedLocationConvert() { + // when use unknown location, pass to BE + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("unknown://test.com", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().startsWith("unknown://")); + // BE + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.startsWith("unknown://")); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java deleted file mode 100644 index 70bad23e01f..00000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -import org.apache.doris.fs.FileSystemFactory; -import org.apache.doris.fs.FileSystemType; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; - -public class S3UtilTest { - @Test - public void testLocationConvert() { - String loc; - loc = S3Util.convertToS3IfNecessary("hdfs://dir/file.path", new HashMap<>()); - Assertions.assertTrue(loc.startsWith("hdfs://")); - - Map<String, String> props = new HashMap<>(); - props.put("dfs.nameservices", "ns"); - loc = S3Util.convertToS3IfNecessary("hdfs:///dir/file.path", props); - Assertions.assertTrue(loc.startsWith("hdfs://") && !loc.startsWith("hdfs:///")); - loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props); - Assertions.assertTrue(loc.startsWith("hdfs://")); - props.put("dfs.nameservices", ""); - loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props); - Assertions.assertTrue(loc.startsWith("/dir") && !loc.startsWith("hdfs://")); - - loc = S3Util.convertToS3IfNecessary("oss://test.com", props); - Assertions.assertTrue(loc.startsWith("oss://")); - - loc = S3Util.convertToS3IfNecessary("gcs://test.com", props); - Assertions.assertTrue(loc.startsWith("gcs://")); - - loc = S3Util.convertToS3IfNecessary("cos://test.com", props); - Assertions.assertTrue(loc.startsWith("cos://")); - - loc = S3Util.convertToS3IfNecessary("cosn://test.com", props); - Assertions.assertTrue(loc.startsWith("cosn://")); - - loc = S3Util.convertToS3IfNecessary("obs://test.com", props); - Assertions.assertTrue(loc.startsWith("obs://")); - } - - - @Test - public void testScanRangeLocationConvert() throws Exception { - String loc; - Map<String, String> rangeProps = new HashMap<>(); - loc = S3Util.toScanRangeLocation("hdfs://dir/file.path", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("hdfs://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.DFS); - - Map<String, String> props = new HashMap<>(); - props.put("dfs.nameservices", "ns"); - loc = S3Util.toScanRangeLocation("hdfs:///dir/file.path", props).toString(); - Assertions.assertTrue(loc.startsWith("hdfs://") && !loc.startsWith("hdfs:///")); - loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path", props).toString(); - Assertions.assertTrue(loc.startsWith("hdfs://")); - props.put("dfs.nameservices", ""); - loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path", props).toString(); - Assertions.assertTrue(loc.startsWith("/dir") && !loc.startsWith("hdfs://")); - - loc = S3Util.toScanRangeLocation("oss://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("s3://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.S3); - - loc = S3Util.toScanRangeLocation("oss://test.oss-dls.aliyuncs.com/path", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("oss://test.oss-dls.aliyuncs")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.DFS); - - loc = S3Util.toScanRangeLocation("cos://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("s3://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.S3); - - loc = S3Util.toScanRangeLocation("cosn://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("cosn://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.OFS); - - loc = S3Util.toScanRangeLocation("obs://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("s3://")); - Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.S3); - - loc = S3Util.toScanRangeLocation("unknown://test.com", rangeProps).toString(); - Assertions.assertTrue(loc.startsWith("unknown://")); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org