This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 9640e2de44d [fix](catalog) refactor location path and support default fs #39116 (#39203) 9640e2de44d is described below commit 9640e2de44dd91f185358f5cf0b25ad184342903 Author: Mingyu Chen <morning...@163.com> AuthorDate: Sat Aug 24 16:05:13 2024 +0800 [fix](catalog) refactor location path and support default fs #39116 (#39203) --- .../org/apache/doris/common/util/LocationPath.java | 433 +++++++++++---------- .../apache/doris/datasource/FileQueryScanNode.java | 96 ++--- .../org/apache/doris/datasource/FileScanNode.java | 6 +- .../org/apache/doris/datasource/FileSplit.java | 25 +- .../org/apache/doris/datasource/SplitCreator.java | 5 +- .../doris/datasource/hive/HiveMetaStoreCache.java | 30 +- .../doris/datasource/hive/source/HiveScanNode.java | 20 +- .../doris/datasource/hive/source/HiveSplit.java | 12 +- .../hudi/source/COWIncrementalRelation.java | 11 +- .../doris/datasource/hudi/source/HudiScanNode.java | 13 +- .../doris/datasource/hudi/source/HudiSplit.java | 6 +- .../datasource/iceberg/source/IcebergScanNode.java | 48 +-- .../datasource/iceberg/source/IcebergSplit.java | 20 +- .../maxcompute/source/MaxComputeScanNode.java | 34 +- .../maxcompute/source/MaxComputeSplit.java | 19 +- .../datasource/paimon/source/PaimonScanNode.java | 22 +- .../datasource/paimon/source/PaimonSource.java | 1 - .../datasource/paimon/source/PaimonSplit.java | 20 +- .../doris/datasource/tvf/source/TVFScanNode.java | 23 +- .../org/apache/doris/planner/HiveTableSink.java | 2 +- .../org/apache/doris/planner/IcebergTableSink.java | 2 +- .../ExternalFileTableValuedFunction.java | 18 - .../apache/doris/common/util/LocationPathTest.java | 20 +- .../doris/planner/FederationBackendPolicyTest.java | 77 ++-- 24 files changed, 433 insertions(+), 530 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index eccb483578a..267e20a1f95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -20,6 +20,7 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.property.constants.CosProperties; import org.apache.doris.datasource.property.constants.ObsProperties; import org.apache.doris.datasource.property.constants.OssProperties; @@ -27,7 +28,9 @@ import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.fs.FileSystemType; import org.apache.doris.thrift.TFileType; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; @@ -49,10 +52,11 @@ public class LocationPath { private static final Logger LOG = LogManager.getLogger(LocationPath.class); private static final String SCHEME_DELIM = "://"; private static final String NONSTANDARD_SCHEME_DELIM = ":/"; - private final LocationType locationType; + private final Scheme scheme; private final String location; + private final boolean isBindBroker; - public enum LocationType { + public enum Scheme { HDFS, LOCAL, // Local File BOS, // Baidu @@ -74,122 +78,230 @@ public class LocationPath { NOSCHEME // no scheme info } - private LocationPath(String location) { - this(location, Collections.emptyMap(), true); + @VisibleForTesting + public LocationPath(String location) { + this(location, Maps.newHashMap(), true); } public LocationPath(String location, Map<String, String> props) { this(location, props, true); } - public LocationPath(String location, Map<String, String> props, boolean convertPath) { - String scheme = parseScheme(location).toLowerCase(); - if (scheme.isEmpty()) { - locationType = LocationType.NOSCHEME; - this.location = location; - } else { - switch (scheme) { - case FeConstants.FS_PREFIX_HDFS: - locationType = LocationType.HDFS; - // Need add hdfs host to location - String host = props.get(HdfsResource.DSF_NAMESERVICES); - this.location = convertPath ? normalizedHdfsPath(location, host) : location; - break; - case FeConstants.FS_PREFIX_S3: - locationType = LocationType.S3; - this.location = location; - break; - case FeConstants.FS_PREFIX_S3A: - locationType = LocationType.S3A; - this.location = convertPath ? convertToS3(location) : location; - break; - case FeConstants.FS_PREFIX_S3N: - // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. - locationType = LocationType.S3N; - this.location = convertPath ? convertToS3(location) : location; - break; - case FeConstants.FS_PREFIX_BOS: - locationType = LocationType.BOS; - // use s3 client to access - this.location = convertPath ? convertToS3(location) : location; - break; - case FeConstants.FS_PREFIX_GCS: - locationType = LocationType.GCS; - // use s3 client to access - this.location = convertPath ? convertToS3(location) : location; - break; - case FeConstants.FS_PREFIX_OSS: - if (isHdfsOnOssEndpoint(location)) { - locationType = LocationType.OSS_HDFS; - this.location = location; - } else { - if (useS3EndPoint(props)) { - this.location = convertPath ? convertToS3(location) : location; - } else { - this.location = location; - } - locationType = LocationType.OSS; - } - break; - case FeConstants.FS_PREFIX_COS: - if (useS3EndPoint(props)) { - this.location = convertPath ? convertToS3(location) : location; - } else { - this.location = location; - } - locationType = LocationType.COS; - break; - case FeConstants.FS_PREFIX_OBS: + private LocationPath(String originLocation, Map<String, String> props, boolean convertPath) { + isBindBroker = props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME); + String tmpLocation = originLocation; + if (!originLocation.contains(SCHEME_DELIM)) { + // Sometimes the file path does not contain scheme, need to add default fs + // eg, /path/to/file.parquet -> hdfs://nn/path/to/file.parquet + // the default fs is from the catalog properties + String defaultFS = props.getOrDefault(HdfsResource.HADOOP_FS_NAME, ""); + tmpLocation = defaultFS + originLocation; + } + String scheme = parseScheme(tmpLocation).toLowerCase(); + switch (scheme) { + case "": + this.scheme = Scheme.NOSCHEME; + break; + case FeConstants.FS_PREFIX_HDFS: + this.scheme = Scheme.HDFS; + // Need add hdfs host to location + String host = props.get(HdfsResource.DSF_NAMESERVICES); + tmpLocation = convertPath ? normalizedHdfsPath(tmpLocation, host) : tmpLocation; + break; + case FeConstants.FS_PREFIX_S3: + this.scheme = Scheme.S3; + break; + case FeConstants.FS_PREFIX_S3A: + this.scheme = Scheme.S3A; + tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation; + break; + case FeConstants.FS_PREFIX_S3N: + // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. + this.scheme = Scheme.S3N; + tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation; + break; + case FeConstants.FS_PREFIX_BOS: + this.scheme = Scheme.BOS; + // use s3 client to access + tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation; + break; + case FeConstants.FS_PREFIX_GCS: + this.scheme = Scheme.GCS; + // use s3 client to access + tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation; + break; + case FeConstants.FS_PREFIX_OSS: + if (isHdfsOnOssEndpoint(tmpLocation)) { + this.scheme = Scheme.OSS_HDFS; + } else { if (useS3EndPoint(props)) { - this.location = convertPath ? convertToS3(location) : location; - } else { - this.location = location; + tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation; } - locationType = LocationType.OBS; - break; - case FeConstants.FS_PREFIX_OFS: - locationType = LocationType.OFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_JFS: - locationType = LocationType.JFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_GFS: - locationType = LocationType.GFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_COSN: - // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues - locationType = LocationType.COSN; - this.location = location; - break; - case FeConstants.FS_PREFIX_LAKEFS: - locationType = LocationType.COSN; - this.location = normalizedLakefsPath(location); - break; - case FeConstants.FS_PREFIX_VIEWFS: - locationType = LocationType.VIEWFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_FILE: - locationType = LocationType.LOCAL; - this.location = location; - break; - default: - locationType = LocationType.UNKNOWN; - this.location = location; - } + this.scheme = Scheme.OSS; + } + break; + case FeConstants.FS_PREFIX_COS: + if (useS3EndPoint(props)) { + tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation; + } + this.scheme = Scheme.COS; + break; + case FeConstants.FS_PREFIX_OBS: + if (useS3EndPoint(props)) { + tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation; + } + this.scheme = Scheme.OBS; + break; + case FeConstants.FS_PREFIX_OFS: + this.scheme = Scheme.OFS; + break; + case FeConstants.FS_PREFIX_JFS: + this.scheme = Scheme.JFS; + break; + case FeConstants.FS_PREFIX_GFS: + this.scheme = Scheme.GFS; + break; + case FeConstants.FS_PREFIX_COSN: + // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues + this.scheme = Scheme.COSN; + break; + case FeConstants.FS_PREFIX_LAKEFS: + this.scheme = Scheme.COSN; + tmpLocation = normalizedLakefsPath(tmpLocation); + break; + case FeConstants.FS_PREFIX_VIEWFS: + this.scheme = Scheme.VIEWFS; + break; + case FeConstants.FS_PREFIX_FILE: + this.scheme = Scheme.LOCAL; + break; + default: + this.scheme = Scheme.UNKNOWN; + break; + } + this.location = tmpLocation; + } + + // Return true if this location is with oss-hdfs + public static boolean isHdfsOnOssEndpoint(String location) { + // example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs". + // https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen + return location.contains("oss-dls.aliyuncs"); + } + + // Return the file system type and the file system identity. + // The file system identity is the scheme and authority of the URI, eg. "hdfs://host:port" or "s3://bucket". + public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) { + LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), true); + FileSystemType fsType = (bindBrokerName != null) ? FileSystemType.BROKER : locationPath.getFileSystemType(); + URI uri = locationPath.getPath().toUri(); + String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority()); + return Pair.of(fsType, fsIdent); + } + + /** + * provide file type for BE. + * + * @param location the location is from fs.listFile + * @return on BE, we will use TFileType to get the suitable client to access storage. + */ + public static TFileType getTFileTypeForBE(String location) { + if (location == null || location.isEmpty()) { + return null; + } + LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), false); + return locationPath.getTFileTypeForBE(); + } + + public static String getTempWritePath(String loc, String prefix) { + Path tempRoot = new Path(loc, prefix); + Path tempPath = new Path(tempRoot, UUID.randomUUID().toString().replace("-", "")); + return tempPath.toString(); + } + + public TFileType getTFileTypeForBE() { + switch (scheme) { + case S3: + case S3A: + case S3N: + case COS: + case OSS: + case OBS: + case BOS: + case GCS: + // ATTN, for COSN, on FE side, use HadoopFS to access, but on BE, use S3 client to access. + case COSN: + case LAKEFS: + // now we only support S3 client for object storage on BE + return TFileType.FILE_S3; + case HDFS: + case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss. + case VIEWFS: + return TFileType.FILE_HDFS; + case GFS: + case JFS: + case OFS: + return TFileType.FILE_BROKER; + case LOCAL: + return TFileType.FILE_LOCAL; + default: + return null; + } + } + + /** + * The converted path is used for BE + * + * @return BE scan range path + */ + public Path toStorageLocation() { + switch (scheme) { + case S3: + case S3A: + case S3N: + case COS: + case OSS: + case OBS: + case BOS: + case GCS: + case COSN: + // All storage will use s3 client to access on BE, so need convert to s3 + return new Path(convertToS3(location)); + case HDFS: + case OSS_HDFS: + case VIEWFS: + case GFS: + case JFS: + case OFS: + case LOCAL: + default: + return getPath(); } } - private static String parseScheme(String location) { + public Scheme getScheme() { + return scheme; + } + + public String get() { + return location; + } + + public Path getPath() { + return new Path(location); + } + + public boolean isBindBroker() { + return isBindBroker; + } + + private static String parseScheme(String finalLocation) { String scheme = ""; - String[] schemeSplit = location.split(SCHEME_DELIM); + String[] schemeSplit = finalLocation.split(SCHEME_DELIM); if (schemeSplit.length > 1) { scheme = schemeSplit[0]; } else { - schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM); + schemeSplit = finalLocation.split(NONSTANDARD_SCHEME_DELIM); if (schemeSplit.length > 1) { scheme = schemeSplit[0]; } @@ -198,9 +310,9 @@ public class LocationPath { // if not get scheme, need consider /path/to/local to no scheme if (scheme.isEmpty()) { try { - Paths.get(location); + Paths.get(finalLocation); } catch (InvalidPathException exception) { - throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location); + throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + finalLocation); } } @@ -217,14 +329,9 @@ public class LocationPath { return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT)); } - public static boolean isHdfsOnOssEndpoint(String location) { - // example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs". - // https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen - return location.contains("oss-dls.aliyuncs"); - } - /** - * The converted path is used for FE to get metadata + * The converted path is used for FE to get metadata. + * Change http://xxxx to s3://xxxx * * @param location origin location * @return metadata location path. just convert when storage is compatible with s3 client. @@ -291,17 +398,9 @@ public class LocationPath { } } - public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) { - LocationPath locationPath = new LocationPath(location); - FileSystemType fsType = (bindBrokerName != null) ? FileSystemType.BROKER : locationPath.getFileSystemType(); - URI uri = locationPath.getPath().toUri(); - String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority()); - return Pair.of(fsType, fsIdent); - } - private FileSystemType getFileSystemType() { FileSystemType fsType; - switch (locationType) { + switch (scheme) { case S3: case S3A: case S3N: @@ -339,98 +438,6 @@ public class LocationPath { return fsType; } - /** - * provide file type for BE. - * - * @param location the location is from fs.listFile - * @return on BE, we will use TFileType to get the suitable client to access storage. - */ - public static TFileType getTFileTypeForBE(String location) { - if (location == null || location.isEmpty()) { - return null; - } - LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), false); - return locationPath.getTFileTypeForBE(); - } - - public TFileType getTFileTypeForBE() { - switch (this.getLocationType()) { - case S3: - case S3A: - case S3N: - case COS: - case OSS: - case OBS: - case BOS: - case GCS: - // ATTN, for COSN, on FE side, use HadoopFS to access, but on BE, use S3 client to access. - case COSN: - case LAKEFS: - // now we only support S3 client for object storage on BE - return TFileType.FILE_S3; - case HDFS: - case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss. - case VIEWFS: - return TFileType.FILE_HDFS; - case GFS: - case JFS: - case OFS: - return TFileType.FILE_BROKER; - case LOCAL: - return TFileType.FILE_LOCAL; - default: - return null; - } - } - - /** - * The converted path is used for BE - * - * @return BE scan range path - */ - public Path toStorageLocation() { - switch (locationType) { - case S3: - case S3A: - case S3N: - case COS: - case OSS: - case OBS: - case BOS: - case GCS: - case COSN: - // All storage will use s3 client to access on BE, so need convert to s3 - return new Path(convertToS3(location)); - case HDFS: - case OSS_HDFS: - case VIEWFS: - case GFS: - case JFS: - case OFS: - case LOCAL: - default: - return getPath(); - } - } - - public LocationType getLocationType() { - return locationType; - } - - public String get() { - return location; - } - - public Path getPath() { - return new Path(location); - } - - public static String getTempWritePath(String loc, String prefix) { - Path tempRoot = new Path(loc, prefix); - Path tempPath = new Path(tempRoot, UUID.randomUUID().toString().replace("-", "")); - return tempPath.toString(); - } - @Override public String toString() { return get(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index a5be5e56cd2..f8048b5fb66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -35,10 +35,8 @@ import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; -import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hive.source.HiveSplit; -import org.apache.doris.datasource.iceberg.source.IcebergSplit; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; @@ -268,7 +266,7 @@ public abstract class FileQueryScanNode extends FileScanNode { boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL; if (isCsvOrJson || isWal) { params.setFileAttributes(getFileAttributes()); - if (getLocationType() == TFileType.FILE_STREAM) { + if (isFileStreamType()) { params.setFileType(TFileType.FILE_STREAM); FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); ExternalFileTableValuedFunction tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf(); @@ -309,19 +307,13 @@ public abstract class FileQueryScanNode extends FileScanNode { if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); } - if (splitAssignment.getSampleSplit() == null && !(getLocationType() == TFileType.FILE_STREAM)) { + if (splitAssignment.getSampleSplit() == null && !isFileStreamType()) { return; } selectedSplitNum = numApproximateSplits(); - TFileType locationType; FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit(); - if (fileSplit instanceof IcebergSplit - && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { - locationType = TFileType.FILE_BROKER; - } else { - locationType = getLocationType(fileSplit.getPath().toString()); - } + TFileType locationType = fileSplit.getLocationType(); totalFileSize = fileSplit.getLength() * selectedSplitNum; long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime(); // Not accurate, only used to estimate concurrency. @@ -351,7 +343,7 @@ public abstract class FileQueryScanNode extends FileScanNode { ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); } selectedSplitNum = inputSplits.size(); - if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) { + if (inputSplits.isEmpty() && !isFileStreamType()) { return; } Multimap<Backend, Split> assignment = backendPolicy.computeScanRangeAssignment(inputSplits); @@ -379,14 +371,6 @@ public abstract class FileQueryScanNode extends FileScanNode { Split split, List<String> pathPartitionKeys) throws UserException { FileSplit fileSplit = (FileSplit) split; - TFileType locationType; - if (fileSplit instanceof IcebergSplit - && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { - locationType = TFileType.FILE_BROKER; - } else { - locationType = getLocationType(fileSplit.getPath().toString()); - } - TScanRangeLocations curLocations = newLocations(); // If fileSplit has partition values, use the values collected from hive partitions. // Otherwise, use the values in file path. @@ -396,41 +380,42 @@ public abstract class FileQueryScanNode extends FileScanNode { isACID = hiveSplit.isACID(); } List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null - ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, + ? BrokerUtil.parseColumnsFromPath(fileSplit.getPathString(), pathPartitionKeys, false, isACID) : fileSplit.getPartitionValues(); - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, - locationType); + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); TFileCompressType fileCompressType = getFileCompressType(fileSplit); rangeDesc.setCompressType(fileCompressType); - if (isACID) { - HiveSplit hiveSplit = (HiveSplit) fileSplit; - hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); - TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); - tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); - AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); - TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); - transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); - List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>(); - for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { - TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); - deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); - deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); - deleteDeltaDescs.add(deleteDeltaDesc); + if (fileSplit instanceof HiveSplit) { + if (isACID) { + HiveSplit hiveSplit = (HiveSplit) fileSplit; + hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); + AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); + TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); + transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); + List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>(); + for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { + TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); + deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); + deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); + deleteDeltaDescs.add(deleteDeltaDesc); + } + transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); + tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } else { + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value()); + rangeDesc.setTableFormatParams(tableFormatFileDesc); } - transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); - tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); - rangeDesc.setTableFormatParams(tableFormatFileDesc); - } else if (fileSplit instanceof HiveSplit) { - TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); - tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value()); - rangeDesc.setTableFormatParams(tableFormatFileDesc); } setScanParams(rangeDesc, fileSplit); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); TScanRangeLocation location = new TScanRangeLocation(); - setLocationPropertiesIfNecessary(backend, locationType, locationProperties); + setLocationPropertiesIfNecessary(backend, fileSplit.getLocationType(), locationProperties); location.setBackendId(backend.getId()); location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); curLocations.addToLocations(location); @@ -493,8 +478,7 @@ public abstract class FileQueryScanNode extends FileScanNode { } private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath, - List<String> columnsFromPathKeys, TFileType locationType) - throws UserException { + List<String> columnsFromPathKeys) { TFileRangeDesc rangeDesc = new TFileRangeDesc(); rangeDesc.setStartOffset(fileSplit.getStart()); rangeDesc.setSize(fileSplit.getLength()); @@ -504,10 +488,10 @@ public abstract class FileQueryScanNode extends FileScanNode { rangeDesc.setColumnsFromPath(columnsFromPath); rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); - rangeDesc.setFileType(locationType); - rangeDesc.setPath(fileSplit.getPath().toString()); - if (locationType == TFileType.FILE_HDFS) { - URI fileUri = fileSplit.getPath().toUri(); + rangeDesc.setFileType(fileSplit.getLocationType()); + rangeDesc.setPath(fileSplit.getPath().toStorageLocation().toString()); + if (fileSplit.getLocationType() == TFileType.FILE_HDFS) { + URI fileUri = fileSplit.getPath().getPath().toUri(); rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority()); } rangeDesc.setModificationTime(fileSplit.getModificationTime()); @@ -555,14 +539,16 @@ public abstract class FileQueryScanNode extends FileScanNode { return scanRangeLocations.size(); } - protected abstract TFileType getLocationType() throws UserException; - - protected abstract TFileType getLocationType(String location) throws UserException; + // Return true if this is a TFileType.FILE_STREAM type. + // Currently, only TVFScanNode may be TFileType.FILE_STREAM type. + protected boolean isFileStreamType() throws UserException { + return false; + } protected abstract TFileFormatType getFileFormatType() throws UserException; protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException { - return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString()); + return Util.inferFileCompressTypeByPath(fileSplit.getPathString()); } protected TFileAttributes getFileAttributes() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index fda93c0c3fa..2868d6ebcf8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -46,7 +47,6 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -241,14 +241,14 @@ public abstract class FileScanNode extends ExternalScanNode { } } - protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length, + protected List<Split> splitFile(LocationPath path, long blockSize, BlockLocation[] blockLocations, long length, long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator) throws IOException { if (blockLocations == null) { blockLocations = new BlockLocation[0]; } List<Split> result = Lists.newArrayList(); - TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.toString()); + TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get()); if (!splittable || compressType != TFileCompressType.PLAIN) { if (LOG.isDebugEnabled()) { LOG.debug("Path {} is not splittable.", path); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java index b02e8be0cd7..7eaa87b74aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java @@ -17,16 +17,17 @@ package org.apache.doris.datasource; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.spi.Split; +import org.apache.doris.thrift.TFileType; import lombok.Data; -import org.apache.hadoop.fs.Path; import java.util.List; @Data public class FileSplit implements Split { - public Path path; + public LocationPath path; public long start; // length of this split, in bytes public long length; @@ -43,27 +44,30 @@ public class FileSplit implements Split { public List<String> partitionValues; public List<String> alternativeHosts; + // the location type for BE, eg: HDFS, LOCAL, S3 + protected TFileType locationType; - public FileSplit(Path path, long start, long length, long fileLength, + public FileSplit(LocationPath path, long start, long length, long fileLength, long modificationTime, String[] hosts, List<String> partitionValues) { this.path = path; this.start = start; this.length = length; this.fileLength = fileLength; - this.modificationTime = modificationTime; + // BE requires modification time to be non-negative. + this.modificationTime = modificationTime < 0 ? 0 : modificationTime; this.hosts = hosts == null ? new String[0] : hosts; this.partitionValues = partitionValues; - } - - public FileSplit(Path path, long start, long length, long fileLength, - String[] hosts, List<String> partitionValues) { - this(path, start, length, fileLength, 0, hosts, partitionValues); + this.locationType = path.isBindBroker() ? TFileType.FILE_BROKER : path.getTFileTypeForBE(); } public String[] getHosts() { return hosts; } + public TFileType getLocationType() { + return locationType; + } + @Override public Object getInfo() { return null; @@ -79,7 +83,8 @@ public class FileSplit implements Split { public static final FileSplitCreator DEFAULT = new FileSplitCreator(); @Override - public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts, + public Split create(LocationPath path, long start, long length, long fileLength, + long modificationTime, String[] hosts, List<String> partitionValues) { return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java index 095a9a5eccc..4df30459db7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java @@ -17,13 +17,12 @@ package org.apache.doris.datasource; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.spi.Split; -import org.apache.hadoop.fs.Path; - import java.util.List; public interface SplitCreator { - Split create(Path path, long start, long length, long fileLength, + Split create(LocationPath path, long start, long length, long fileLength, long modificationTime, String[] hosts, List<String> partitionValues); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index ad36dc221d8..006ed83413a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -369,11 +369,7 @@ public class HiveMetaStoreCache { for (RemoteFile remoteFile : remoteFiles) { String srcPath = remoteFile.getPath().toString(); LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties()); - Path convertedPath = locationPath.toStorageLocation(); - if (!convertedPath.toString().equals(srcPath)) { - remoteFile.setPath(convertedPath); - } - result.addFile(remoteFile); + result.addFile(remoteFile, locationPath); } } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) { // User may manually remove partition under HDFS, in this case, @@ -813,14 +809,17 @@ public class HiveMetaStoreCache { if (status.ok()) { if (delta.isDeleteDelta()) { List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( - name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) .collect(Collectors.toList()); deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); continue; } remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .forEach(fileCacheValue::addFile); + f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> { + LocationPath path = new LocationPath(file.getPath().toString(), + catalog.getProperties()); + fileCacheValue.addFile(file, path); + }); } else { throw new RuntimeException(status.getErrMsg()); } @@ -837,8 +836,12 @@ public class HiveMetaStoreCache { Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .forEach(fileCacheValue::addFile); + f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + .forEach(file -> { + LocationPath path = new LocationPath(file.getPath().toString(), + catalog.getProperties()); + fileCacheValue.addFile(file, path); + }); } else { throw new RuntimeException(status.getErrMsg()); } @@ -998,11 +1001,11 @@ public class HiveMetaStoreCache { private AcidInfo acidInfo; - public void addFile(RemoteFile file) { + public void addFile(RemoteFile file, LocationPath locationPath) { if (isFileVisible(file.getPath())) { HiveFileStatus status = new HiveFileStatus(); status.setBlockLocations(file.getBlockLocations()); - status.setPath(file.getPath()); + status.setPath(locationPath); status.length = file.getSize(); status.blockSize = file.getBlockSize(); status.modificationTime = file.getModificationTime(); @@ -1014,7 +1017,6 @@ public class HiveMetaStoreCache { return partitionValues == null ? 0 : partitionValues.size(); } - public AcidInfo getAcidInfo() { return acidInfo; } @@ -1062,7 +1064,7 @@ public class HiveMetaStoreCache { @Data public static class HiveFileStatus { BlockLocation[] blockLocations; - Path path; + LocationPath path; long length; long blockSize; long modificationTime; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index be722b31c7b..db4161a4e23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -27,10 +27,8 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; @@ -52,7 +50,6 @@ import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; -import org.apache.doris.thrift.TFileType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -343,7 +340,7 @@ public class HiveScanNode extends FileQueryScanNode { allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(), status.getBlockLocations(), status.getLength(), status.getModificationTime(), status.isSplittable(), status.getPartitionValues(), - new HiveSplitCreator(status.getAcidInfo()))); + new HiveSplitCreator(status.getAcidInfo()))); } } @@ -409,21 +406,6 @@ public class HiveScanNode extends FileQueryScanNode { return hmsTable; } - @Override - protected TFileType getLocationType() throws UserException { - return getLocationType(hmsTable.getRemoteTable().getSd().getLocation()); - } - - @Override - protected TFileType getLocationType(String location) throws UserException { - String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); - if (bindBrokerName != null) { - return TFileType.FILE_BROKER; - } - return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() -> - new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName())); - } - @Override public TFileFormatType getFileFormatType() throws UserException { TFileFormatType type = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java index 7c9345991fb..5dd63e734c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java @@ -17,18 +17,17 @@ package org.apache.doris.datasource.hive.source; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.SplitCreator; import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.spi.Split; -import org.apache.hadoop.fs.Path; - import java.util.List; public class HiveSplit extends FileSplit { - public HiveSplit(Path path, long start, long length, long fileLength, + private HiveSplit(LocationPath path, long start, long length, long fileLength, long modificationTime, String[] hosts, List<String> partitionValues, AcidInfo acidInfo) { super(path, start, length, fileLength, modificationTime, hosts, partitionValues); this.acidInfo = acidInfo; @@ -53,12 +52,9 @@ public class HiveSplit extends FileSplit { this.acidInfo = acidInfo; } - public HiveSplitCreator() { - this(null); - } - @Override - public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts, + public Split create(LocationPath path, long start, long length, long fileLength, + long modificationTime, String[] hosts, List<String> partitionValues) { return new HiveSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues, acidInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java index fa24dc53e56..5e76996bb12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.hudi.source; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileSplit; import org.apache.doris.spi.Split; @@ -210,14 +211,16 @@ public class COWIncrementalRelation implements IncrementalRelation { : Collections.emptyList(); for (String baseFile : filteredMetaBootstrapFullPaths) { HoodieWriteStat stat = fileToWriteStat.get(baseFile); - splits.add(new FileSplit(new Path(baseFile), 0, stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), - new String[0], + splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0, + stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), + 0, new String[0], HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath()))); } for (String baseFile : filteredRegularFullPaths) { HoodieWriteStat stat = fileToWriteStat.get(baseFile); - splits.add(new FileSplit(new Path(baseFile), 0, stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), - new String[0], + splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0, + stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), + 0, new String[0], HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath()))); } return splits; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 66c14446845..abd5a377f5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -240,7 +240,7 @@ public class HudiScanNode extends HiveScanNode { } } - public void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) { + private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(hudiSplit.getTableFormatType().value()); THudiFileDesc fileDesc = new THudiFileDesc(); @@ -351,8 +351,7 @@ public class HudiScanNode extends HiveScanNode { long fileSize = baseFile.getFileSize(); // Need add hdfs host to location LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); - Path splitFilePath = locationPath.toStorageLocation(); - splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, + splits.add(new FileSplit(locationPath, 0, fileSize, fileSize, 0, new String[0], partition.getPartitionValues())); }); } else { @@ -362,7 +361,7 @@ public class HudiScanNode extends HiveScanNode { } } - private void getPartitionSplits(List<HivePartition> partitions, List<Split> splits) { + private void getPartitionsSplits(List<HivePartition> partitions, List<Split> splits) { Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor(); CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); AtomicReference<Throwable> throwable = new AtomicReference<>(); @@ -397,7 +396,7 @@ public class HudiScanNode extends HiveScanNode { partitionInit = true; } List<Split> splits = Collections.synchronizedList(new ArrayList<>()); - getPartitionSplits(prunedPartitions, splits); + getPartitionsSplits(prunedPartitions, splits); return splits; } @@ -482,8 +481,8 @@ public class HudiScanNode extends HiveScanNode { // no base file, use log file to parse file type String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath; - HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize, - new String[0], partitionValues); + HudiSplit split = new HudiSplit(new LocationPath(agencyPath, hmsTable.getCatalogProperties()), + 0, fileSize, fileSize, new String[0], partitionValues); split.setTableFormatType(TableFormatType.HUDI); split.setDataFilePath(filePath); split.setHudiDeltaLogs(logs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java index 121dcf68005..c72f7621fea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java @@ -17,18 +17,18 @@ package org.apache.doris.datasource.hudi.source; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileSplit; import lombok.Data; -import org.apache.hadoop.fs.Path; import java.util.List; @Data public class HudiSplit extends FileSplit { - public HudiSplit(Path file, long start, long length, long fileLength, String[] hosts, + public HudiSplit(LocationPath file, long start, long length, long fileLength, String[] hosts, List<String> partitionValues) { - super(file, start, length, fileLength, hosts, partitionValues); + super(file, start, length, fileLength, 0, hosts, partitionValues); } private String instantTime; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 56222d84955..2ca51298fe6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; @@ -42,7 +41,6 @@ import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TIcebergDeleteFileDesc; import org.apache.doris.thrift.TIcebergFileDesc; import org.apache.doris.thrift.TPlanNode; @@ -51,7 +49,6 @@ import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; @@ -133,7 +130,7 @@ public class IcebergScanNode extends FileQueryScanNode { } } - public void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) { + private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value()); TIcebergFileDesc fileDesc = new TIcebergFileDesc(); @@ -147,8 +144,7 @@ public class IcebergScanNode extends FileQueryScanNode { TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc(); String deleteFilePath = filter.getDeleteFilePath(); LocationPath locationPath = new LocationPath(deleteFilePath, icebergSplit.getConfig()); - Path splitDeletePath = locationPath.toStorageLocation(); - deleteFileDesc.setPath(splitDeletePath.toString()); + deleteFileDesc.setPath(locationPath.toStorageLocation().toString()); if (filter instanceof IcebergDeleteFileFilter.PositionDelete) { IcebergDeleteFileFilter.PositionDelete positionDelete = (IcebergDeleteFileFilter.PositionDelete) filter; @@ -211,8 +207,6 @@ public class IcebergScanNode extends FileQueryScanNode { try (CloseableIterable<CombinedScanTask> combinedScanTasks = TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { - String dataFilePath = normalizeLocation(splitTask.file().path().toString()); - List<String> partitionValues = new ArrayList<>(); if (isPartitionedTable) { StructLike structLike = splitTask.file().partition(); @@ -238,10 +232,10 @@ public class IcebergScanNode extends FileQueryScanNode { // Counts the number of partitions read partitionPathSet.add(structLike.toString()); } - LocationPath locationPath = new LocationPath(dataFilePath, source.getCatalog().getProperties()); - Path finalDataFilePath = locationPath.toStorageLocation(); + String originalPath = splitTask.file().path().toString(); + LocationPath locationPath = new LocationPath(originalPath, source.getCatalog().getProperties()); IcebergSplit split = new IcebergSplit( - finalDataFilePath, + locationPath, splitTask.start(), splitTask.length(), splitTask.file().fileSizeInBytes(), @@ -249,7 +243,7 @@ public class IcebergScanNode extends FileQueryScanNode { formatVersion, source.getCatalog().getProperties(), partitionValues, - splitTask.file().path().toString()); + originalPath); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } @@ -311,36 +305,6 @@ public class IcebergScanNode extends FileQueryScanNode { return filters; } - @Override - public TFileType getLocationType() throws UserException { - String location = icebergTable.location(); - return getLocationType(location); - } - - @Override - public TFileType getLocationType(String location) throws UserException { - final String fLocation = normalizeLocation(location); - return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() -> - new DdlException("Unknown file location " + fLocation + " for iceberg table " + icebergTable.name())); - } - - private String normalizeLocation(String location) { - Map<String, String> props = source.getCatalog().getProperties(); - LocationPath locationPath = new LocationPath(location, props); - String icebergCatalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE); - if ("hadoop".equalsIgnoreCase(icebergCatalogType)) { - // if no scheme info, fill will HADOOP_FS_NAME - // if no HADOOP_FS_NAME, then should be local file system - if (locationPath.getLocationType() == LocationPath.LocationType.NOSCHEME) { - String fsName = props.get(HdfsResource.HADOOP_FS_NAME); - if (fsName != null) { - location = fsName + location; - } - } - } - return location; - } - @Override public TFileFormatType getFileFormatType() throws UserException { TFileFormatType type; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index d867245dbe3..8549e96bc2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -17,10 +17,10 @@ package org.apache.doris.datasource.iceberg.source; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileSplit; import lombok.Data; -import org.apache.hadoop.fs.Path; import java.util.List; import java.util.Map; @@ -28,21 +28,23 @@ import java.util.Map; @Data public class IcebergSplit extends FileSplit { + // Doris will convert the schema in FileSystem to achieve the function of natively reading files. + // For example, s3a:// will be converted to s3://. + // The position delete file of iceberg will record the full path of the datafile, which includes the schema. + // When comparing datafile with position delete, the converted path cannot be used, + // but the original datafile path must be used. private final String originalPath; + private Integer formatVersion; + private List<IcebergDeleteFileFilter> deleteFileFilters; + private Map<String, String> config; // File path will be changed if the file is modified, so there's no need to get modification time. - public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts, + public IcebergSplit(LocationPath file, long start, long length, long fileLength, String[] hosts, Integer formatVersion, Map<String, String> config, List<String> partitionList, String originalPath) { - super(file, start, length, fileLength, hosts, partitionList); + super(file, start, length, fileLength, 0, hosts, partitionList); this.formatVersion = formatVersion; this.config = config; this.originalPath = originalPath; } - - private Integer formatVersion; - private List<IcebergDeleteFileFilter> deleteFileFilters; - private Map<String, String> config; } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index 6521ecd3101..ea651df9fef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -23,11 +23,10 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileQueryScanNode; -import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.TablePartitionValues; -import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; @@ -35,13 +34,12 @@ import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TMaxComputeFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; import com.aliyun.odps.Table; import com.aliyun.odps.tunnel.TunnelException; -import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.Collection; @@ -53,8 +51,8 @@ import java.util.Map; public class MaxComputeScanNode extends FileQueryScanNode { private final MaxComputeExternalTable table; - private final MaxComputeExternalCatalog catalog; - public static final int MIN_SPLIT_SIZE = 4096; + private static final int MIN_SPLIT_SIZE = 4096; + private static final LocationPath VIRTUAL_SLICE_PART = new LocationPath("/virtual_slice_part", Maps.newHashMap()); public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE, needCheckColumnPriv); @@ -64,7 +62,6 @@ public class MaxComputeScanNode extends FileQueryScanNode { StatisticalType statisticalType, boolean needCheckColumnPriv) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); table = (MaxComputeExternalTable) desc.getTable(); - catalog = (MaxComputeExternalCatalog) table.getCatalog(); } @Override @@ -74,7 +71,7 @@ public class MaxComputeScanNode extends FileQueryScanNode { } } - public void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeSplit) { + private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value()); TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc(); @@ -85,16 +82,6 @@ public class MaxComputeScanNode extends FileQueryScanNode { rangeDesc.setTableFormatParams(tableFormatFileDesc); } - @Override - protected TFileType getLocationType() throws UserException { - return getLocationType(null); - } - - @Override - protected TFileType getLocationType(String location) throws UserException { - return TFileType.FILE_NET; - } - @Override public TFileFormatType getFileFormatType() { return TFileFormatType.FORMAT_JNI; @@ -144,10 +131,8 @@ public class MaxComputeScanNode extends FileQueryScanNode { private static void addPartitionSplits(List<Split> result, Table odpsTable, String partitionSpec) { long modificationTime = odpsTable.getLastDataModifiedTime().getTime(); // use '-1' to read whole partition, avoid expending too much time on calling table.getTotalRows() - Pair<Long, Long> range = Pair.of(0L, -1L); - FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_part"), - range.first, range.second, -1, modificationTime, null, Collections.emptyList()); - result.add(new MaxComputeSplit(partitionSpec, rangeSplit)); + result.add(new MaxComputeSplit(VIRTUAL_SLICE_PART, + 0, -1L, -1, modificationTime, null, Collections.emptyList(), null)); } private static void addBatchSplits(List<Split> result, Table odpsTable, long totalRows) { @@ -171,9 +156,8 @@ public class MaxComputeScanNode extends FileQueryScanNode { if (!sliceRange.isEmpty()) { for (int i = 0; i < sliceRange.size(); i++) { Pair<Long, Long> range = sliceRange.get(i); - FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_" + i), - range.first, range.second, totalRows, modificationTime, null, Collections.emptyList()); - result.add(new MaxComputeSplit(rangeSplit)); + result.add(new MaxComputeSplit(new LocationPath("/virtual_slice_" + i, Maps.newHashMap()), + range.first, range.second, totalRows, modificationTime, null, Collections.emptyList(), null)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeSplit.java index 20b285c4cfc..256ee1adefb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeSplit.java @@ -17,23 +17,22 @@ package org.apache.doris.datasource.maxcompute.source; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.thrift.TFileType; +import java.util.List; import java.util.Optional; public class MaxComputeSplit extends FileSplit { private final Optional<String> partitionSpec; - public MaxComputeSplit(FileSplit rangeSplit) { - super(rangeSplit.path, rangeSplit.start, rangeSplit.length, rangeSplit.fileLength, - rangeSplit.hosts, rangeSplit.partitionValues); - this.partitionSpec = Optional.empty(); - } - - public MaxComputeSplit(String partitionSpec, FileSplit rangeSplit) { - super(rangeSplit.path, rangeSplit.start, rangeSplit.length, rangeSplit.fileLength, - rangeSplit.hosts, rangeSplit.partitionValues); - this.partitionSpec = Optional.of(partitionSpec); + public MaxComputeSplit(LocationPath path, long start, long length, long fileLength, + long modificationTime, String[] hosts, List<String> partitionValues, String partitionSpec) { + super(path, start, length, fileLength, modificationTime, hosts, partitionValues); + this.partitionSpec = Optional.ofNullable(partitionSpec); + // MC always use FILE_NET type + this.locationType = TFileType.FILE_NET; } public Optional<String> getPartitionSpec() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 8ed8af15d86..27b40b5bcc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -35,19 +35,16 @@ import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TPaimonDeletionFileDesc; import org.apache.doris.thrift.TPaimonFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; -import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.RawFile; @@ -147,7 +144,7 @@ public class PaimonScanNode extends FileQueryScanNode { } } - public void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) { + private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value()); TPaimonFileDesc fileDesc = new TPaimonFileDesc(); @@ -214,10 +211,9 @@ public class PaimonScanNode extends FileQueryScanNode { DeletionFile deletionFile = deletionFiles.get(i); LocationPath locationPath = new LocationPath(file.path(), source.getCatalog().getProperties()); - Path finalDataFilePath = locationPath.toStorageLocation(); try { List<Split> dorisSplits = splitFile( - finalDataFilePath, + locationPath, 0, null, file.length(), @@ -242,11 +238,10 @@ public class PaimonScanNode extends FileQueryScanNode { for (RawFile file : rawFiles) { LocationPath locationPath = new LocationPath(file.path(), source.getCatalog().getProperties()); - Path finalDataFilePath = locationPath.toStorageLocation(); try { splits.addAll( splitFile( - finalDataFilePath, + locationPath, 0, null, file.length(), @@ -286,17 +281,6 @@ public class PaimonScanNode extends FileQueryScanNode { } } - @Override - public TFileType getLocationType() throws DdlException, MetaNotFoundException { - return getLocationType(((FileStoreTable) source.getPaimonTable()).location().toString()); - } - - @Override - public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException { - return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() -> - new DdlException("Unknown file location " + location + " for paimon table ")); - } - @Override public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { return TFileFormatType.FORMAT_JNI; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 9ac44537e8a..44063e3226b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -33,7 +33,6 @@ import java.util.Map; public class PaimonSource { private final PaimonExternalTable paimonExtTable; private final Table originTable; - private final TupleDescriptor desc; public PaimonSource(PaimonExternalTable table, TupleDescriptor desc, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java index 6cca70577f8..ffd063d77e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java @@ -17,11 +17,12 @@ package org.apache.doris.datasource.paimon.source; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.SplitCreator; import org.apache.doris.datasource.TableFormatType; -import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.Split; @@ -29,20 +30,21 @@ import java.util.List; import java.util.Optional; public class PaimonSplit extends FileSplit { + private static final LocationPath DUMMY_PATH = new LocationPath("/dummyPath", Maps.newHashMap()); private Split split; private TableFormatType tableFormatType; private Optional<DeletionFile> optDeletionFile; public PaimonSplit(Split split) { - super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null); + super(DUMMY_PATH, 0, 0, 0, 0, null, null); this.split = split; this.tableFormatType = TableFormatType.PAIMON; this.optDeletionFile = Optional.empty(); } - public PaimonSplit(Path file, long start, long length, long fileLength, String[] hosts, - List<String> partitionList) { - super(file, start, length, fileLength, hosts, partitionList); + private PaimonSplit(LocationPath file, long start, long length, long fileLength, long modificationTime, + String[] hosts, List<String> partitionList) { + super(file, start, length, fileLength, modificationTime, hosts, partitionList); this.tableFormatType = TableFormatType.PAIMON; this.optDeletionFile = Optional.empty(); } @@ -51,10 +53,6 @@ public class PaimonSplit extends FileSplit { return split; } - public void setSplit(Split split) { - this.split = split; - } - public TableFormatType getTableFormatType() { return tableFormatType; } @@ -76,14 +74,14 @@ public class PaimonSplit extends FileSplit { static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator(); @Override - public org.apache.doris.spi.Split create(Path path, + public org.apache.doris.spi.Split create(LocationPath path, long start, long length, long fileLength, long modificationTime, String[] hosts, List<String> partitionValues) { - return new PaimonSplit(path, start, length, fileLength, hosts, partitionValues); + return new PaimonSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index 26b90c26a46..b0f0406c215 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; @@ -41,7 +42,7 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.Path; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -101,21 +102,16 @@ public class TVFScanNode extends FileQueryScanNode { @Override protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException { TFileCompressType fileCompressType = tableValuedFunction.getTFileCompressType(); - return Util.getOrInferCompressType(fileCompressType, fileSplit.getPath().toString()); + return Util.getOrInferCompressType(fileCompressType, fileSplit.getPathString()); } @Override - public TFileType getLocationType() throws DdlException, MetaNotFoundException { - return getLocationType(null); + protected boolean isFileStreamType() { + return tableValuedFunction.getTFileType() == TFileType.FILE_STREAM; } @Override - public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException { - return tableValuedFunction.getTFileType(); - } - - @Override - public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException { + public Map<String, String> getLocationProperties() { return tableValuedFunction.getLocationProperties(); } @@ -137,13 +133,14 @@ public class TVFScanNode extends FileQueryScanNode { } List<TBrokerFileStatus> fileStatuses = tableValuedFunction.getFileStatuses(); for (TBrokerFileStatus fileStatus : fileStatuses) { - Path path = new Path(fileStatus.getPath()); + Map<String, String> prop = Maps.newHashMap(); try { - splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(), + splits.addAll(splitFile(new LocationPath(fileStatus.getPath(), prop), fileStatus.getBlockSize(), + null, fileStatus.getSize(), fileStatus.getModificationTime(), fileStatus.isSplitable, null, FileSplitCreator.DEFAULT)); } catch (IOException e) { - LOG.warn("get file split failed for TVF: {}", path, e); + LOG.warn("get file split failed for TVF: {}", fileStatus.getPath(), e); throw new UserException(e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index efa9bd9b8f8..bbd278afef4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -118,7 +118,7 @@ public class HiveTableSink extends BaseExternalTableDataSink { THiveLocationParams locationParams = new THiveLocationParams(); LocationPath locationPath = new LocationPath(sd.getLocation(), targetTable.getHadoopProperties()); - String location = locationPath.toString(); + String location = locationPath.getPath().toString(); String storageLocation = locationPath.toStorageLocation().toString(); TFileType fileType = locationPath.getTFileTypeForBE(); if (fileType == TFileType.FILE_S3) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java index 0e01b599964..bfacb572305 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java @@ -133,7 +133,7 @@ public class IcebergTableSink extends BaseExternalTableDataSink { // location LocationPath locationPath = new LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps); tSink.setOutputPath(locationPath.toStorageLocation().toString()); - tSink.setOriginalOutputPath(locationPath.toString()); + tSink.setOriginalOutputPath(locationPath.getPath().toString()); tSink.setFileType(locationPath.getTFileTypeForBE()); if (insertCtx.isPresent()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 5113fa6dd5c..e4c52971813 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -71,7 +71,6 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTextSerdeType; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.protobuf.ByteString; @@ -97,23 +96,6 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio public static final String PROP_TABLE_ID = "table_id"; - protected static final ImmutableSet<String> FILE_FORMAT_PROPERTIES = new ImmutableSet.Builder<String>() - .add(FileFormatConstants.PROP_FORMAT) - .add(FileFormatConstants.PROP_JSON_ROOT) - .add(FileFormatConstants.PROP_JSON_PATHS) - .add(FileFormatConstants.PROP_STRIP_OUTER_ARRAY) - .add(FileFormatConstants.PROP_READ_JSON_BY_LINE) - .add(FileFormatConstants.PROP_NUM_AS_STRING) - .add(FileFormatConstants.PROP_FUZZY_PARSE) - .add(FileFormatConstants.PROP_COLUMN_SEPARATOR) - .add(FileFormatConstants.PROP_LINE_DELIMITER) - .add(FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES) - .add(FileFormatConstants.PROP_SKIP_LINES) - .add(FileFormatConstants.PROP_CSV_SCHEMA) - .add(FileFormatConstants.PROP_COMPRESS_TYPE) - .add(FileFormatConstants.PROP_PATH_PARTITION_KEYS) - .build(); - // Columns got from file and path(if has) protected List<Column> columns = null; // User specified csv columns, it will override columns got from file diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java index 69130f57fff..23f052d6131 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java @@ -17,6 +17,8 @@ package org.apache.doris.common.util; +import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.common.util.LocationPath.Scheme; import org.apache.doris.fs.FileSystemType; import org.junit.jupiter.api.Assertions; @@ -63,8 +65,20 @@ public class LocationPathTest { Assertions.assertTrue(locationPath.get().startsWith("/dir") && !locationPath.get().startsWith("hdfs://")); Assertions.assertTrue(beLocation.startsWith("/dir") && !beLocation.startsWith("hdfs://")); - } + props.clear(); + props.put(HdfsResource.HADOOP_FS_NAME, "hdfs://test.com"); + locationPath = new LocationPath("/dir/file.path", props); + Assertions.assertTrue(locationPath.get().startsWith("hdfs://")); + Assertions.assertEquals("hdfs://test.com/dir/file.path", locationPath.get()); + Assertions.assertEquals("hdfs://test.com/dir/file.path", locationPath.toStorageLocation().toString()); + props.clear(); + props.put(HdfsResource.HADOOP_FS_NAME, "oss://test.com"); + locationPath = new LocationPath("/dir/file.path", props); + Assertions.assertTrue(locationPath.get().startsWith("oss://")); + Assertions.assertEquals("oss://test.com/dir/file.path", locationPath.get()); + Assertions.assertEquals("s3://test.com/dir/file.path", locationPath.toStorageLocation().toString()); + } @Test public void testJFSLocationConvert() { @@ -171,7 +185,7 @@ public class LocationPathTest { LocationPath locationPath = new LocationPath("unknown://test.com", rangeProps); // FE Assertions.assertTrue(locationPath.get().startsWith("unknown://")); - Assertions.assertTrue(locationPath.getLocationType() == LocationPath.LocationType.UNKNOWN); + Assertions.assertTrue(locationPath.getScheme() == Scheme.UNKNOWN); // BE String beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("unknown://")); @@ -184,7 +198,7 @@ public class LocationPathTest { LocationPath locationPath = new LocationPath("/path/to/local", rangeProps); // FE Assertions.assertTrue(locationPath.get().equalsIgnoreCase("/path/to/local")); - Assertions.assertTrue(locationPath.getLocationType() == LocationPath.LocationType.NOSCHEME); + Assertions.assertTrue(locationPath.getScheme() == Scheme.NOSCHEME); // BE String beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.equalsIgnoreCase("/path/to/local")); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index 82f46862674..57e64f0f223 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -20,6 +20,7 @@ package org.apache.doris.planner; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FederationBackendPolicy; import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.NodeSelectionStrategy; @@ -76,28 +77,28 @@ public class FederationBackendPolicyTest { }; List<Split> splits = new ArrayList<>(); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 112140970, 112140970, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 120839661, 120839661, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 108897409, 108897409, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 95795997, 95795997, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 105664025, 105664025, 0, null, Collections.emptyList())); @@ -141,28 +142,28 @@ public class FederationBackendPolicyTest { }; List<Split> splits = new ArrayList<>(); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 112140970, 112140970, 0, new String[] {"172.30.0.100"}, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 120839661, 120839661, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 108897409, 108897409, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 95795997, 95795997, 0, new String[] {"172.30.0.106"}, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 105664025, 105664025, 0, null, Collections.emptyList())); @@ -178,11 +179,11 @@ public class FederationBackendPolicyTest { for (Split split : assignedSplits) { FileSplit fileSplit = (FileSplit) split; ++totalSplitNum; - if (fileSplit.getPath().equals(new Path( + if (fileSplit.getPath().getPath().equals(new Path( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"))) { Assert.assertEquals("172.30.0.100", backend.getHost()); checkedLocalSplit.add(true); - } else if (fileSplit.getPath().equals(new Path( + } else if (fileSplit.getPath().getPath().equals(new Path( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"))) { Assert.assertEquals("172.30.0.106", backend.getHost()); checkedLocalSplit.add(true); @@ -235,28 +236,28 @@ public class FederationBackendPolicyTest { }; List<Split> splits = new ArrayList<>(); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 112140970, 112140970, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 120839661, 120839661, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 108897409, 108897409, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 95795997, 95795997, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 105664025, 105664025, 0, null, Collections.emptyList())); @@ -344,7 +345,7 @@ public class FederationBackendPolicyTest { int splitCount = random.nextInt(1000 - 100) + 100; for (int i = 0; i < splitCount; ++i) { long splitLength = random.nextInt(115343360 - 94371840) + 94371840; - FileSplit split = new FileSplit(new Path( + FileSplit split = new FileSplit(new LocationPath( "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), 0, splitLength, splitLength, 0, null, Collections.emptyList()); remoteSplits.add(split); @@ -364,7 +365,7 @@ public class FederationBackendPolicyTest { totalLocalHosts.add(localHost); } long localSplitLength = random.nextInt(115343360 - 94371840) + 94371840; - FileSplit split = new FileSplit(new Path( + FileSplit split = new FileSplit(new LocationPath( "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), 0, localSplitLength, localSplitLength, 0, localHosts.toArray(new String[0]), Collections.emptyList()); @@ -467,7 +468,7 @@ public class FederationBackendPolicyTest { int splitCount = random.nextInt(1000 - 100) + 100; for (int i = 0; i < splitCount; ++i) { long splitLength = random.nextInt(115343360 - 94371840) + 94371840; - FileSplit split = new FileSplit(new Path( + FileSplit split = new FileSplit(new LocationPath( "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), 0, splitLength, splitLength, 0, null, Collections.emptyList()); remoteSplits.add(split); @@ -487,7 +488,7 @@ public class FederationBackendPolicyTest { totalLocalHosts.add(localHost); } long localSplitLength = random.nextInt(115343360 - 94371840) + 94371840; - FileSplit split = new FileSplit(new Path( + FileSplit split = new FileSplit(new LocationPath( "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), 0, localSplitLength, localSplitLength, 0, localHosts.toArray(new String[0]), Collections.emptyList()); @@ -604,28 +605,28 @@ public class FederationBackendPolicyTest { }; List<Split> splits = new ArrayList<>(); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 112140970, 112140970, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 120839661, 120839661, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 108897409, 108897409, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 95795997, 95795997, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 104600402, 104600402, 0, null, Collections.emptyList())); - splits.add(new FileSplit(new Path( + splits.add(new FileSplit(new LocationPath( "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), 0, 105664025, 105664025, 0, null, Collections.emptyList())); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org