This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch branch-refactor_property in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-refactor_property by this push: new 71f66c0973f Enhance Object Storage Support and Add File System Connectivity Check (#49853) 71f66c0973f is described below commit 71f66c0973fb817535f25d8c4b99dfc8acee5447 Author: Calvin Kirs <guoqi...@selectdb.com> AuthorDate: Tue Apr 8 20:01:55 2025 +0800 Enhance Object Storage Support and Add File System Connectivity Check (#49853) This pull request introduces several improvements and features for the object storage integration and file system management: ### Support for Object Storage Prefixes: Added support for HTTPS, HTTP, S3, and other object storage parameter prefixes. This enhancement allows users to easily integrate various object storage systems by specifying the correct prefix, making the system more flexible and adaptable. ### Separation of TVF/LOAD Attributes: Refactored the handling of TVF (Table-Valued Functions) and LOAD attributes by separating them into distinct components. This improves maintainability and clarity, making the configuration more modular and easier to manage. ### File System Connectivity Check --- .../java/org/apache/doris/analysis/BrokerDesc.java | 66 ++++++------ .../java/org/apache/doris/analysis/LoadStmt.java | 113 +++------------------ .../org/apache/doris/analysis/OutFileClause.java | 15 +-- .../org/apache/doris/analysis/StorageBackend.java | 18 +++- .../org/apache/doris/analysis/StorageDesc.java | 9 ++ .../java/org/apache/doris/backup/Repository.java | 24 +++-- .../datasource/property/ConnectionProperties.java | 6 +- .../storage/AbstractObjectStorageProperties.java | 36 +++++++ .../datasource/property/storage/COSProperties.java | 22 ++-- .../property/storage/HDFSProperties.java | 17 ++++ .../datasource/property/storage/OBSProperties.java | 25 ++--- .../datasource/property/storage/OSSProperties.java | 23 ++--- .../property/storage/ObjectStorageProperties.java | 15 +-- .../datasource/property/storage/S3Properties.java | 35 +++---- .../property/storage/S3PropertyUtils.java | 90 ++++++++++++++++ .../property/storage/StorageProperties.java | 6 ++ .../java/org/apache/doris/fs/obj/S3ObjStorage.java | 9 ++ .../apache/doris/fs/remote/RemoteFileSystem.java | 4 + .../org/apache/doris/fs/remote/S3FileSystem.java | 13 +++ .../tablefunction/HdfsTableValuedFunction.java | 1 + .../doris/tablefunction/S3TableValuedFunction.java | 110 ++------------------ .../backup_restore_cos.groovy | 72 ++++++++++--- .../refactor_storage_param_p0/s3_load.groovy | 18 ++-- 23 files changed, 391 insertions(+), 356 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java index a68d9099558..c951a18dd39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java @@ -20,13 +20,11 @@ package org.apache.doris.analysis; import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.catalog.Env; import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PrintableMap; -import org.apache.doris.datasource.property.constants.BosProperties; -import org.apache.doris.datasource.property.storage.ObjectStorageProperties; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.fs.PersistentFileSystem; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TFileType; @@ -54,6 +52,7 @@ public class BrokerDesc extends StorageDesc implements Writable { // just for multi load public static final String MULTI_LOAD_BROKER = "__DORIS_MULTI_LOAD_BROKER__"; public static final String MULTI_LOAD_BROKER_BACKEND_KEY = "__DORIS_MULTI_LOAD_BROKER_BACKEND__"; + @Deprecated @SerializedName("cts3") private boolean convertedToS3 = false; @@ -76,18 +75,11 @@ public class BrokerDesc extends StorageDesc implements Writable { if (properties != null) { this.properties.putAll(properties); } - if (isMultiLoadBroker()) { - this.storageType = StorageBackend.StorageType.LOCAL; - } else { - this.storageType = StorageBackend.StorageType.BROKER; - } this.storageProperties = StorageProperties.createStorageProperties(properties); // we should use storage properties? this.properties.putAll(storageProperties.getBackendConfigProperties()); - this.convertedToS3 = ObjectStorageProperties.class.isInstance(this.storageProperties); - if (this.convertedToS3) { - this.storageType = StorageBackend.StorageType.S3; - } + this.storageType = StorageBackend.StorageType.convertToStorageType(storageProperties.getStorageName()); + } public BrokerDesc(String name, StorageBackend.StorageType storageType, Map<String, String> properties) { @@ -99,15 +91,38 @@ public class BrokerDesc extends StorageDesc implements Writable { this.storageProperties = StorageProperties.createStorageProperties(properties); // we should use storage properties? this.properties.putAll(storageProperties.getBackendConfigProperties()); - this.storageType = storageType; - this.convertedToS3 = ObjectStorageProperties.class.isInstance(this.storageProperties); - if (this.convertedToS3) { - this.storageType = StorageBackend.StorageType.S3; + this.storageType = StorageBackend.StorageType.convertToStorageType(storageProperties.getStorageName()); + } + + + public BrokerDesc(String name, Map<String, String> properties, StorageProperties storageProperties) { + this.name = name; + this.properties = Maps.newHashMap(); + if (properties != null) { + this.properties.putAll(properties); + } + this.storageProperties = storageProperties; + this.storageType = StorageType.convertToStorageType(storageProperties.getStorageName()); + + // we should use storage properties? + this.properties.putAll(storageProperties.getBackendConfigProperties()); + } + + public BrokerDesc(String name, StorageBackend.StorageType storageType, + Map<String, String> properties, StorageProperties storageProperties) { + this.name = name; + this.properties = Maps.newHashMap(); + if (properties != null) { + this.properties.putAll(properties); } + this.storageProperties = storageProperties; + // we should use storage properties? + this.properties.putAll(storageProperties.getBackendConfigProperties()); + this.storageType = StorageType.convertToStorageType(storageProperties.getStorageName()); } - public String getFileLocation(String location) { - return this.convertedToS3 ? BosProperties.convertPathToS3(location) : location; + public String getFileLocation(String location) throws UserException { + return storageProperties.convertUrlToFilePath(location); } public static BrokerDesc createForStreamLoad() { @@ -118,6 +133,7 @@ public class BrokerDesc extends StorageDesc implements Writable { return this.name.equalsIgnoreCase(MULTI_LOAD_BROKER); } + //todo need convert from StorageProperties public TFileType getFileType() { switch (storageType) { case LOCAL: @@ -136,10 +152,6 @@ public class BrokerDesc extends StorageDesc implements Writable { } } - public StorageBackend.StorageType storageType() { - return storageType; - } - @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); @@ -155,17 +167,7 @@ public class BrokerDesc extends StorageDesc implements Writable { final String val = Text.readString(in); properties.put(key, val); } - StorageBackend.StorageType st = StorageBackend.StorageType.BROKER; - String typeStr = properties.remove(PersistentFileSystem.STORAGE_TYPE); - if (typeStr != null) { - try { - st = StorageBackend.StorageType.valueOf(typeStr); - } catch (IllegalArgumentException e) { - LOG.warn("set to BROKER, because of exception", e); - } - } this.storageProperties = StorageProperties.createStorageProperties(properties); - storageType = st; } public static BrokerDesc read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 2863f7c8a18..ba835821a3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -21,23 +21,20 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB; -import org.apache.doris.cloud.security.SecurityChecker; -import org.apache.doris.cloud.storage.RemoteBase; -import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.property.constants.AzureProperties; import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.datasource.property.storage.ObjectStorageProperties; +import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TFileType; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -49,8 +46,6 @@ import org.checkerframework.checker.nullness.qual.Nullable; import java.io.File; import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -496,11 +491,11 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { etlJobType = resourceDesc.getEtlJobType(); // check resource usage privilege if (!Env.getCurrentEnv().getAccessManager().checkResourcePriv(ConnectContext.get(), - resourceDesc.getName(), - PrivPredicate.USAGE)) { + resourceDesc.getName(), + PrivPredicate.USAGE)) { throw new AnalysisException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser() - + "'@'" + ConnectContext.get().getRemoteIP() - + "' for resource '" + resourceDesc.getName() + "'"); + + "'@'" + ConnectContext.get().getRemoteIP() + + "' for resource '" + resourceDesc.getName() + "'"); } } else if (brokerDesc != null) { etlJobType = EtlJobType.BROKER; @@ -600,53 +595,18 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { } } - private void checkEndpoint(String endpoint) throws UserException { - HttpURLConnection connection = null; - try { - String urlStr = "http://" + endpoint; - SecurityChecker.getInstance().startSSRFChecking(urlStr); - URL url = new URL(urlStr); - connection = (HttpURLConnection) url.openConnection(); - connection.setConnectTimeout(10000); - connection.connect(); - } catch (Exception e) { - LOG.warn("Failed to connect endpoint={}, err={}", endpoint, e); - String msg; - if (e instanceof UserException) { - msg = ((UserException) e).getDetailMessage(); - } else { - msg = e.getMessage(); - } - throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR, - "Failed to access object storage, message=" + msg, e); - } finally { - if (connection != null) { - try { - connection.disconnect(); - } catch (Exception e) { - LOG.warn("Failed to disconnect connection, endpoint={}, err={}", endpoint, e); - } - } - SecurityChecker.getInstance().stopSSRFChecking(); - } - } - public void checkS3Param() throws UserException { - Map<String, String> brokerDescProperties = brokerDesc.getProperties(); - if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT) - && brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY) - && brokerDescProperties.containsKey(S3Properties.Env.SECRET_KEY) - && brokerDescProperties.containsKey(S3Properties.Env.REGION)) { - String endpoint = brokerDescProperties.get(S3Properties.Env.ENDPOINT); - endpoint = endpoint.replaceFirst("^http://", ""); - endpoint = endpoint.replaceFirst("^https://", ""); - brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint); + if (brokerDesc.getFileType() != null && brokerDesc.getFileType().equals(TFileType.FILE_S3)) { + + ObjectStorageProperties storageProperties = (ObjectStorageProperties) brokerDesc.getStorageProperties(); + String endpoint = storageProperties.getEndpoint(); checkWhiteList(endpoint); - if (AzureProperties.checkAzureProviderPropertyExist(brokerDescProperties)) { - return; + + //should add connectivity test + boolean connectivityTest = FileSystemFactory.get(brokerDesc.getStorageProperties()).connectivityTest(); + if (!connectivityTest) { + throw new UserException("Failed to access object storage, message=connectivity test failed"); } - checkEndpoint(endpoint); - checkAkSk(); } } @@ -659,47 +619,6 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { } } - private void checkAkSk() throws UserException { - RemoteBase remote = null; - ObjectInfo objectInfo = null; - String curFile = null; - try { - Map<String, String> brokerDescProperties = brokerDesc.getProperties(); - String provider = getProviderFromEndpoint(); - for (DataDescription dataDescription : dataDescriptions) { - for (String filePath : dataDescription.getFilePaths()) { - curFile = filePath; - String bucket = getBucketFromFilePath(filePath); - objectInfo = new ObjectInfo(ObjectStoreInfoPB.Provider.valueOf(provider.toUpperCase()), - brokerDescProperties.get(S3Properties.Env.ACCESS_KEY), - brokerDescProperties.get(S3Properties.Env.SECRET_KEY), - bucket, brokerDescProperties.get(S3Properties.Env.ENDPOINT), - brokerDescProperties.get(S3Properties.Env.REGION), ""); - remote = RemoteBase.newInstance(objectInfo); - // RemoteBase#headObject does not throw exception if key does not exist. - remote.headObject("1"); - remote.listObjects(null); - remote.close(); - } - } - } catch (Exception e) { - LOG.warn("Failed to access object storage, file={}, proto={}, err={}", curFile, objectInfo, e.toString()); - String msg; - if (e instanceof UserException) { - msg = ((UserException) e).getDetailMessage(); - } else { - msg = e.getMessage(); - } - throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR, - "Failed to access object storage, message=" + msg, e); - } finally { - if (remote != null) { - remote.close(); - } - } - - } - @Override public StmtType stmtType() { return StmtType.LOAD; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index ff49c8a80cd..1e80058ff51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -623,21 +623,8 @@ public class OutFileClause { */ private void analyzeBrokerDesc(Set<String> processedPropKeys) throws UserException { String brokerName = properties.get(PROP_BROKER_NAME); - StorageBackend.StorageType storageType; - if (properties.containsKey(PROP_BROKER_NAME)) { - processedPropKeys.add(PROP_BROKER_NAME); - storageType = StorageBackend.StorageType.BROKER; - } else if (filePath.toUpperCase().startsWith(S3_FILE_PREFIX)) { - brokerName = StorageBackend.StorageType.S3.name(); - storageType = StorageBackend.StorageType.S3; - } else if (filePath.toUpperCase().startsWith(HDFS_FILE_PREFIX.toUpperCase())) { - brokerName = StorageBackend.StorageType.HDFS.name(); - storageType = StorageBackend.StorageType.HDFS; - } else { - return; - } - brokerDesc = new BrokerDesc(brokerName, storageType, properties); + brokerDesc = new BrokerDesc(brokerName, properties); } public static String getFsName(String path) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java index 67a76cec450..8ad671f7e88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java @@ -41,7 +41,7 @@ public class StorageBackend implements ParseNode { if (Strings.isNullOrEmpty(path)) { throw new AnalysisException(exceptionMsg == null ? "No destination path specified." : exceptionMsg); } - checkUri(URI.create(path), type); + //checkUri(URI.create(path), type); } public static void checkUri(URI uri, StorageBackend.StorageType type) throws AnalysisException { @@ -82,6 +82,7 @@ public class StorageBackend implements ParseNode { } } + // todo only bos used public StorageBackend(String storageName, String location, StorageType storageType, Map<String, String> properties) { this.storageDesc = new StorageDesc(storageName, storageType, properties); @@ -185,6 +186,21 @@ public class StorageBackend implements ParseNode { return TStorageBackendType.BROKER; } } + + public static StorageType convertToStorageType(String storageName) { + switch (storageName.toLowerCase()) { + case "hdfs": + return StorageType.HDFS; + case "s3": + return StorageType.S3; + case "jfs": + return StorageType.JFS; + case "local": + return StorageType.LOCAL; + default: + throw new IllegalArgumentException("Invalid storage type: " + storageName); + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java index 28ebdff71b7..7c24f334edc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java @@ -35,6 +35,7 @@ import java.util.Map; * The broker's StorageBackend.StorageType desc */ public class StorageDesc extends ResourceDesc { + @Deprecated @SerializedName("st") protected StorageBackend.StorageType storageType; @@ -51,6 +52,14 @@ public class StorageDesc extends ResourceDesc { this.storageProperties = StorageProperties.createStorageProperties(properties); } + public StorageDesc(String name, Map<String, String> properties) { + this.name = name; + this.properties = properties; + this.storageProperties = StorageProperties.createStorageProperties(properties); + this.storageType = StorageBackend.StorageType.convertToStorageType(storageProperties.getStorageName()); + + } + public void setName(String name) { this.name = name; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java index ff3f011ffed..a6ab4e98a18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java @@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PrintableMap; @@ -223,7 +224,14 @@ public class Repository implements Writable, GsonPostProcessable { } public String getLocation() { - return location; + if (null == fileSystem) { + return location; + } + try { + return fileSystem.getStorageProperties().convertUrlToFilePath(location); + } catch (UserException e) { + throw new RuntimeException(e); + } } public String getErrorMsg() { @@ -271,7 +279,7 @@ public class Repository implements Writable, GsonPostProcessable { if (name.compareTo((String) root.get("name")) != 0) { return new Status(ErrCode.COMMON_ERROR, "Invalid repository __repo_info, expected repo '" + name + "', but get name '" - + (String) root.get("name") + "' from " + repoInfoFilePath); + + (String) root.get("name") + "' from " + repoInfoFilePath); } name = (String) root.get("name"); createTime = TimeUtils.timeStringToLong((String) root.get("create_time")); @@ -334,21 +342,21 @@ public class Repository implements Writable, GsonPostProcessable { // eg: location/__palo_repository_repo_name/__repo_info public String assembleRepoInfoFilePath() { - return Joiner.on(PATH_DELIMITER).join(location, + return Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name), FILE_REPO_INFO); } // eg: location/__palo_repository_repo_name/__my_sp1/__meta public String assembleMetaInfoFilePath(String label) { - return Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), + return Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name), joinPrefix(PREFIX_SNAPSHOT_DIR, label), FILE_META_INFO); } // eg: location/__palo_repository_repo_name/__my_sp1/__info_2018-01-01-08-00-00 public String assembleJobInfoFilePath(String label, long createTime) { - return Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), + return Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name), joinPrefix(PREFIX_SNAPSHOT_DIR, label), jobInfoFileNameWithTimestamp(createTime)); } @@ -356,7 +364,7 @@ public class Repository implements Writable, GsonPostProcessable { // eg: // __palo_repository_repo_name/__ss_my_ss1/__ss_content/__db_10001/__tbl_10020/__part_10031/__idx_10020/__10022/ public String getRepoTabletPathBySnapshotInfo(String label, SnapshotInfo info) { - String path = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), + String path = Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name), joinPrefix(PREFIX_SNAPSHOT_DIR, label), DIR_SNAPSHOT_CONTENT, joinPrefix(PREFIX_DB, info.getDbId()), @@ -375,7 +383,7 @@ public class Repository implements Writable, GsonPostProcessable { } public String getRepoPath(String label, String childPath) { - String path = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), + String path = Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name), joinPrefix(PREFIX_SNAPSHOT_DIR, label), DIR_SNAPSHOT_CONTENT, childPath); @@ -631,7 +639,7 @@ public class Repository implements Writable, GsonPostProcessable { // 2. download status = fileSystem.downloadWithFileSize(remoteFilePathWithChecksum, localFilePath, - remoteFiles.get(0).getSize()); + remoteFiles.get(0).getSize()); if (!status.ok()) { return status; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java index 20e0a225b65..0ec219a2459 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java @@ -44,7 +44,6 @@ public abstract class ConnectionProperties { Map<String, String> allProps = loadConfigFromFile(getResourceConfigPropName()); // 2. overwrite result properties with original properties allProps.putAll(origProps); - Map<String, String> matchParams = new HashMap<>(); // 3. set fields from resultProps List<Field> supportedProps = PropertyUtils.getConnectorProperties(this.getClass()); for (Field field : supportedProps) { @@ -55,7 +54,6 @@ public abstract class ConnectionProperties { if (allProps.containsKey(name)) { try { field.set(this, allProps.get(name)); - matchParams.put(name, allProps.get(name)); } catch (IllegalAccessException e) { throw new RuntimeException("Failed to set property " + name + ", " + e.getMessage(), e); } @@ -65,13 +63,15 @@ public abstract class ConnectionProperties { } // 3. check properties checkRequiredProperties(); - setOrigProps(matchParams); } // Some properties may be loaded from file // Subclass can override this method to load properties from file. // The return value is the properties loaded from file, not include original properties protected Map<String, String> loadConfigFromFile(String resourceConfig) { + if (Strings.isNullOrEmpty(resourceConfig)) { + return new HashMap<>(); + } if (Strings.isNullOrEmpty(origProps.get(resourceConfig))) { return Maps.newHashMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java index 7d177d3bd3d..eaff23189a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractObjectStorageProperties.java @@ -17,10 +17,12 @@ package org.apache.doris.datasource.property.storage; +import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.ConnectorProperty; import lombok.Getter; import lombok.Setter; +import org.apache.commons.lang3.StringUtils; import java.util.HashMap; import java.util.Map; @@ -139,4 +141,38 @@ public abstract class AbstractObjectStorageProperties extends StorageProperties toNativeS3Configuration(config); return config; } + + + @Override + protected void normalizedAndCheckProps() { + super.normalizedAndCheckProps(); + setEndpointIfNotSet(); + } + + private void setEndpointIfNotSet() { + if (StringUtils.isNotBlank(getEndpoint())) { + return; + } + String endpoint = S3PropertyUtils.constructEndpointFromUrl(origProps, usePathStyle, forceParsingByStandardUrl); + if (StringUtils.isBlank(endpoint)) { + throw new IllegalArgumentException("endpoint is required"); + } + setEndpoint(endpoint); + } + + @Override + public String convertUrlToFilePath(String uri) throws UserException { + return S3PropertyUtils.convertToS3Address(uri, getUsePathStyle(), getForceParsingByStandardUrl()); + + } + + @Override + public String checkLoadPropsAndReturnUri(Map<String, String> loadProps) throws UserException { + return S3PropertyUtils.checkLoadPropsAndReturnUri(loadProps); + } + + @Override + public String getStorageName() { + return "S3"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java index 997ed0e3b87..5d2d33ca60f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java @@ -20,7 +20,7 @@ package org.apache.doris.datasource.property.storage; import org.apache.doris.datasource.property.ConnectorProperty; import com.google.common.base.Strings; -import org.apache.hadoop.conf.Configuration; +import lombok.Setter; import java.util.Map; import java.util.regex.Matcher; @@ -28,12 +28,13 @@ import java.util.regex.Pattern; public class COSProperties extends AbstractObjectStorageProperties { - @ConnectorProperty(names = {"cos.endpoint", "s3.endpoint"}, + @Setter + @ConnectorProperty(names = {"cos.endpoint", "endpoint", "s3.endpoint"}, required = false, description = "The endpoint of COS.") protected String cosEndpoint = ""; - @ConnectorProperty(names = {"cos.region", "s3.region"}, + @ConnectorProperty(names = {"cos.region", "region", "s3.region"}, required = false, description = "The region of COS.") protected String cosRegion = "ap-guangzhou"; @@ -55,16 +56,6 @@ public class COSProperties extends AbstractObjectStorageProperties { return origProps.containsKey("cos.access_key"); } - @Override - public Configuration getHadoopConfiguration() { - Configuration conf = new Configuration(false); - conf.set("fs.cosn.bucket.region", getRegion()); - conf.set("fs.cos.endpoint", cosEndpoint); - conf.set("fs.cosn.userinfo.secretId", cosAccessKey); - conf.set("fs.cosn.userinfo.secretKey", cosSecretKey); - conf.set("fs.cosn.impl", "org.apache.hadoop.fs.CosFileSystem"); - return conf; - } @Override public void toNativeS3Configuration(Map<String, String> config) { @@ -98,4 +89,9 @@ public class COSProperties extends AbstractObjectStorageProperties { public String getSecretKey() { return cosSecretKey; } + + @Override + public void setEndpoint(String endpoint) { + this.cosEndpoint = endpoint; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java index c7a6a69b111..6b64df1817a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java @@ -17,6 +17,8 @@ package org.apache.doris.datasource.property.storage; +import org.apache.doris.common.NotImplementedException; +import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.ConnectorProperty; import com.google.common.base.Strings; @@ -158,4 +160,19 @@ public class HDFSProperties extends StorageProperties { return backendConfigProperties; } + + @Override + public String convertUrlToFilePath(String url) throws UserException { + throw new NotImplementedException("Support HDFS is not implemented"); + } + + @Override + public String checkLoadPropsAndReturnUri(Map<String, String> loadProps) throws UserException { + throw new NotImplementedException("Support HDFS is not implemented"); + } + + @Override + public String getStorageName() { + return "HDFS"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java index 45086365ae8..eeaa4984567 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java @@ -20,7 +20,7 @@ package org.apache.doris.datasource.property.storage; import org.apache.doris.datasource.property.ConnectorProperty; import com.google.common.base.Strings; -import org.apache.hadoop.conf.Configuration; +import lombok.Setter; import java.util.Map; import java.util.regex.Matcher; @@ -28,7 +28,9 @@ import java.util.regex.Pattern; public class OBSProperties extends AbstractObjectStorageProperties { - @ConnectorProperty(names = {"obs.endpoint", "s3.endpoint"}, required = false, description = "The endpoint of OBS.") + @Setter + @ConnectorProperty(names = {"obs.endpoint", "endpoint", "s3.endpoint"}, required = false, + description = "The endpoint of OBS.") protected String obsEndpoint = "obs.cn-east-3.myhuaweicloud.com"; @ConnectorProperty(names = {"obs.access_key"}, description = "The access key of OBS.") @@ -38,7 +40,8 @@ public class OBSProperties extends AbstractObjectStorageProperties { protected String obsSecretKey = ""; - @ConnectorProperty(names = {"obs.region", "s3.region"}, required = false, description = "The region of OBS.") + @ConnectorProperty(names = {"obs.region", "region", "s3.region"}, required = false, + description = "The region of OBS.") protected String region; public OBSProperties(Map<String, String> origProps) { @@ -50,17 +53,6 @@ public class OBSProperties extends AbstractObjectStorageProperties { return origProps.containsKey("obs.access_key"); } - - @Override - public Configuration getHadoopConfiguration() { - Configuration conf = new Configuration(false); - conf.set("fs.obs.endpoint", obsEndpoint); - conf.set("fs.obs.access.key", obsAccessKey); - conf.set("fs.obs.secret.key", obsSecretKey); - conf.set("fs.obs.impl", "org.apache.hadoop.fs.obs.OBSFileSystem"); - return conf; - } - @Override public void toNativeS3Configuration(Map<String, String> config) { config.putAll(generateAWSS3Properties(obsEndpoint, getRegion(), obsAccessKey, obsSecretKey)); @@ -88,4 +80,9 @@ public class OBSProperties extends AbstractObjectStorageProperties { public String getSecretKey() { return obsSecretKey; } + + @Override + public void setEndpoint(String endpoint) { + this.obsEndpoint = endpoint; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java index 625eb395328..1353139da2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java @@ -20,15 +20,18 @@ package org.apache.doris.datasource.property.storage; import org.apache.doris.datasource.property.ConnectorProperty; import com.google.common.base.Strings; -import org.apache.hadoop.conf.Configuration; +import lombok.Setter; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; public class OSSProperties extends AbstractObjectStorageProperties { - @ConnectorProperty(names = {"oss.endpoint", "s3.endpoint"}, required = false, description = "The endpoint of OSS.") - protected String endpoint = "oss-cn-hangzhou.aliyuncs.com"; + + @Setter + @ConnectorProperty(names = {"oss.endpoint", "endpoint", "s3.endpoint"}, required = false, + description = "The endpoint of OSS.") + protected String endpoint = ""; @ConnectorProperty(names = {"oss.access_key"}, description = "The access key of OSS.") protected String accessKey = ""; @@ -36,7 +39,8 @@ public class OSSProperties extends AbstractObjectStorageProperties { @ConnectorProperty(names = {"oss.secret_key"}, description = "The secret key of OSS.") protected String secretKey = ""; - @ConnectorProperty(names = {"oss.region", "s3.region"}, required = false, description = "The region of OSS.") + @ConnectorProperty(names = {"oss.region", "region", "s3.region"}, required = false, + description = "The region of OSS.") protected String region; @@ -48,17 +52,6 @@ public class OSSProperties extends AbstractObjectStorageProperties { return origProps.containsKey("oss.access_key"); } - @Override - public Configuration getHadoopConfiguration() { - Configuration conf = new Configuration(false); - conf.set("fs.oss.endpoint", endpoint); - conf.set("fs.oss.region", getRegion()); - conf.set("fs.oss.accessKeyId", accessKey); - conf.set("fs.oss.accessKeySecret", secretKey); - conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); - return conf; - } - @Override public void toNativeS3Configuration(Map<String, String> config) { config.putAll(generateAWSS3Properties(endpoint, getRegion(), accessKey, secretKey)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java index 22694293e25..6800f6afabd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/ObjectStorageProperties.java @@ -17,8 +17,6 @@ package org.apache.doris.datasource.property.storage; -import org.apache.hadoop.conf.Configuration; - import java.util.Map; /** @@ -28,16 +26,6 @@ import java.util.Map; */ public interface ObjectStorageProperties { - /** - * Converts the object storage properties to a configuration map compatible with the - * Hadoop HDFS protocol. This method allows the object storage to be used in a Hadoop-based - * environment by providing the necessary configuration details in the form of key-value pairs. - * - * @param config a map to populate with the HDFS-compatible configuration parameters. - * These parameters will be used by Hadoop clients to connect to the object storage system. - */ - Configuration getHadoopConfiguration(); - /** * Converts the object storage properties to a configuration map compatible with the * AWS S3 protocol. This method provides the necessary configuration parameters for connecting @@ -56,4 +44,7 @@ public interface ObjectStorageProperties { String getAccessKey(); String getSecretKey(); + + void setEndpoint(String endpoint); + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java index 0e200108634..d17504ba3de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java @@ -22,7 +22,6 @@ import org.apache.doris.datasource.property.metastore.AWSGlueProperties; import org.apache.doris.datasource.property.metastore.AliyunDLFProperties; import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; import org.apache.paimon.options.Options; import java.lang.reflect.Field; @@ -32,19 +31,22 @@ import java.util.Map; public class S3Properties extends AbstractObjectStorageProperties { - @ConnectorProperty(names = {"s3.endpoint", "AWS_ENDPOINT"}, + + @ConnectorProperty(names = {"s3.endpoint", "AWS_ENDPOINT", "access_key"}, + required = false, description = "The endpoint of S3.") protected String s3Endpoint = ""; - @ConnectorProperty(names = {"s3.region", "AWS_REGION"}, + @ConnectorProperty(names = {"s3.region", "AWS_REGION", "region", "region"}, + required = false, description = "The region of S3.") protected String s3Region = "us-east-1"; - @ConnectorProperty(names = {"s3.access_key", "AWS_ACCESS_KEY"}, + @ConnectorProperty(names = {"s3.access_key", "AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"}, description = "The access key of S3.") protected String s3AccessKey = ""; - @ConnectorProperty(names = {"s3.secret_key", "AWS_SECRET_KEY"}, + @ConnectorProperty(names = {"s3.secret_key", "AWS_SECRET_KEY", "secret_key"}, description = "The secret key of S3.") protected String s3SecretKey = ""; @@ -95,6 +97,7 @@ public class S3Properties extends AbstractObjectStorageProperties { super(Type.S3, origProps); } + /** * Guess if the storage properties is for this storage type. * Subclass should override this method to provide the correct implementation. @@ -102,9 +105,6 @@ public class S3Properties extends AbstractObjectStorageProperties { * @return */ public static boolean guessIsMe(Map<String, String> origProps) { - if (origProps.containsKey("s3.access_key") || origProps.containsKey("AWS_ACCESS_KEY")) { - return true; - } List<Field> fields = getIdentifyFields(); return StorageProperties.checkIdentifierKey(origProps, fields); } @@ -144,20 +144,6 @@ public class S3Properties extends AbstractObjectStorageProperties { catalogProps.put("s3.path-style-access", usePathStyle); } - @Override - public Configuration getHadoopConfiguration() { - Configuration conf = new Configuration(false); - conf.set("fs.s3a.access.key", s3AccessKey); - conf.set("fs.s3a.secret.key", s3SecretKey); - conf.set("fs.s3a.endpoint", s3Endpoint); - conf.set("fs.s3a.region", s3Region); - conf.set("fs.s3a.connection.maximum", String.valueOf(s3ConnectionMaximum)); - conf.set("fs.s3a.connection.timeout", String.valueOf(s3ConnectionRequestTimeoutS)); - conf.set("fs.s3a.request.timeout", String.valueOf(s3ConnectionTimeoutS)); - conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); - return conf; - } - @Override public void toNativeS3Configuration(Map<String, String> config) { Map<String, String> awsS3Properties = generateAWSS3Properties(s3Endpoint, s3Region, s3AccessKey, s3SecretKey, @@ -191,4 +177,9 @@ public class S3Properties extends AbstractObjectStorageProperties { public String getSecretKey() { return s3SecretKey; } + + @Override + public void setEndpoint(String endpoint) { + this.s3Endpoint = endpoint; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java new file mode 100644 index 00000000000..ce4c3b15c89 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.storage; + +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.S3URI; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; + +public class S3PropertyUtils { + + + private static final String URI_KEY = "uri"; + + public static String constructEndpointFromUrl(Map<String, String> props, + String stringUsePathStyle, String stringForceParsingByStandardUri) { + String uri = props.get(URI_KEY); + if (uri == null || uri.isEmpty()) { + return null; + } + boolean usePathStyle = Boolean.parseBoolean(stringUsePathStyle); + boolean forceParsingByStandardUri = Boolean.parseBoolean(stringForceParsingByStandardUri); + try { + S3URI s3uri = S3URI.create(uri, usePathStyle, forceParsingByStandardUri); + return s3uri.getEndpoint().orElse(null); + } catch (UserException e) { + return null; + } + } + + public static String constructRegionFromUrl(Map<String, String> props, String stringUsePathStyle, + String stringForceParsingByStandardUri) { + String uri = props.get(URI_KEY); + if (uri == null || uri.isEmpty()) { + return null; + } + boolean usePathStyle = Boolean.parseBoolean(stringUsePathStyle); + boolean forceParsingByStandardUri = Boolean.parseBoolean(stringForceParsingByStandardUri); + + S3URI s3uri = null; + try { + s3uri = S3URI.create(uri, usePathStyle, forceParsingByStandardUri); + return s3uri.getRegion().orElse(null); + } catch (UserException e) { + return null; + } + } + + public static String convertToS3Address(String path, String stringUsePathStyle, + String stringForceParsingByStandardUri) throws UserException { + if (StringUtils.isBlank(path)) { + throw new UserException("path is null"); + } + if (path.startsWith("s3://")) { + return path; + } + + boolean usePathStyle = Boolean.parseBoolean(stringUsePathStyle); + boolean forceParsingByStandardUri = Boolean.parseBoolean(stringForceParsingByStandardUri); + S3URI s3uri = S3URI.create(path, usePathStyle, forceParsingByStandardUri); + return "s3" + S3URI.SCHEME_DELIM + s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey(); + } + + public static String checkLoadPropsAndReturnUri(Map<String, String> props) throws UserException { + if (props.isEmpty()) { + throw new UserException("props is empty"); + } + if (!props.containsKey(URI_KEY)) { + throw new UserException("props must contain uri"); + } + return props.get(URI_KEY); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java index 8fdc486b7e8..555a878937f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java @@ -155,4 +155,10 @@ public abstract class StorageProperties extends ConnectionProperties { return false; } + public abstract String convertUrlToFilePath(String url) throws UserException; + + + public abstract String checkLoadPropsAndReturnUri(Map<String, String> loadProps) throws UserException; + + public abstract String getStorageName(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java index 8d391d9bb2d..b66f0019956 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java @@ -114,6 +114,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> { public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { try { + remotePath = s3Properties.convertUrlToFilePath(remotePath); URI uri = new URI(remotePath); String bucketName = uri.getHost(); String prefix = uri.getPath().substring(1); @@ -162,6 +163,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> { @Override public Status headObject(String remotePath) { try { + remotePath = s3Properties.convertUrlToFilePath(remotePath); S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); HeadObjectResponse response = getClient() .headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); @@ -183,6 +185,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> { @Override public Status getObject(String remoteFilePath, File localFile) { try { + remoteFilePath = s3Properties.convertUrlToFilePath(remoteFilePath); S3URI uri = S3URI.create(remoteFilePath, isUsePathStyle, forceParsingByStandardUri); GetObjectResponse response = getClient().getObject( GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath()); @@ -203,6 +206,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> { @Override public Status putObject(String remotePath, @Nullable InputStream content, long contentLength) { try { + remotePath = s3Properties.convertUrlToFilePath(remotePath); S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); RequestBody body = RequestBody.fromInputStream(content, contentLength); PutObjectResponse response = @@ -224,6 +228,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> { @Override public Status deleteObject(String remotePath) { try { + remotePath = s3Properties.convertUrlToFilePath(remotePath); S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); DeleteObjectResponse response = getClient() @@ -246,6 +251,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> { @Override public Status deleteObjects(String absolutePath) { try { + absolutePath = s3Properties.convertUrlToFilePath(absolutePath); S3URI baseUri = S3URI.create(absolutePath, isUsePathStyle, forceParsingByStandardUri); String continuationToken = ""; boolean isTruncated = false; @@ -291,6 +297,8 @@ public class S3ObjStorage implements ObjStorage<S3Client> { @Override public Status copyObject(String origFilePath, String destFilePath) { try { + origFilePath = s3Properties.convertUrlToFilePath(origFilePath); + destFilePath = s3Properties.convertUrlToFilePath(destFilePath); S3URI origUri = S3URI.create(origFilePath, isUsePathStyle, forceParsingByStandardUri); S3URI descUri = S3URI.create(destFilePath, isUsePathStyle, forceParsingByStandardUri); CopyObjectResponse response = getClient() @@ -314,6 +322,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> { @Override public RemoteObjects listObjects(String absolutePath, String continuationToken) throws DdlException { try { + absolutePath = s3Properties.convertUrlToFilePath(absolutePath); S3URI uri = S3URI.create(absolutePath, isUsePathStyle, forceParsingByStandardUri); String bucket = uri.getBucket(); String prefix = uri.getKey(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 08b7e1cde78..9f613b79fd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -139,4 +139,8 @@ public abstract class RemoteFileSystem extends PersistentFileSystem implements C fsLock.unlock(); } } + + public boolean connectivityTest() throws UserException { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index 8ebf0d4d01e..4b48041dba3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -72,6 +72,19 @@ public class S3FileSystem extends ObjFileSystem { return objStorage.globList(remotePath, result, fileNameOnly); } + @Override + public boolean connectivityTest() throws UserException { + S3ObjStorage objStorage = (S3ObjStorage) this.objStorage; + try { + objStorage.getClient().listBuckets(); + return true; + } catch (Exception e) { + LOG.error("S3 connectivityTest error", e); + } + return false; + + } + @VisibleForTesting public HadoopAuthenticator getAuthenticator() { return authenticator; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java index 80149e3d138..37984981486 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java @@ -55,6 +55,7 @@ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction { throw new AnalysisException(String.format("Properties '%s' is required.", PROP_URI)); } URI uri = URI.create(uriStr); + //fixme refactor in HDFSStorageProperties StorageBackend.checkUri(uri, StorageType.HDFS); filePath = uri.getScheme() + "://" + uri.getAuthority() + uri.getPath(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index fdc9ef05d13..e015414ed82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -18,23 +18,13 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.S3URI; -import org.apache.doris.datasource.property.PropertyConverter; -import org.apache.doris.datasource.property.constants.AzureProperties; -import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.thrift.TFileType; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; -import org.apache.commons.lang3.StringUtils; - -import java.util.HashMap; import java.util.Map; /** @@ -52,61 +42,18 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { public static final String NAME = "s3"; public static final String PROP_URI = "uri"; - private static final ImmutableSet<String> DEPRECATED_KEYS = - ImmutableSet.of("access_key", "secret_key", "session_token", "region", - "ACCESS_KEY", "SECRET_KEY", "SESSION_TOKEN", "REGION"); - public S3TableValuedFunction(Map<String, String> properties) throws AnalysisException { // 1. analyze common properties - Map<String, String> otherProps = super.parseCommonProperties(properties); - - // 2. analyze uri and other properties - String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null); - if (Strings.isNullOrEmpty(uriStr)) { - throw new AnalysisException(String.format("Properties '%s' is required.", PROP_URI)); - } - forwardCompatibleDeprecatedKeys(otherProps); - - String usePathStyle = getOrDefaultAndRemove(otherProps, PropertyConverter.USE_PATH_STYLE, - PropertyConverter.USE_PATH_STYLE_DEFAULT_VALUE); - String forceParsingByStandardUri = getOrDefaultAndRemove(otherProps, - PropertyConverter.FORCE_PARSING_BY_STANDARD_URI, - PropertyConverter.FORCE_PARSING_BY_STANDARD_URI_DEFAULT_VALUE); - - S3URI s3uri = getS3Uri(uriStr, Boolean.parseBoolean(usePathStyle.toLowerCase()), - Boolean.parseBoolean(forceParsingByStandardUri.toLowerCase())); - String endpoint = constructEndpoint(otherProps, s3uri); - if (StringUtils.isNotBlank(endpoint)) { - otherProps.putIfAbsent(S3Properties.ENDPOINT, endpoint); - } - if (!otherProps.containsKey(S3Properties.REGION)) { - String region; - if (AzureProperties.checkAzureProviderPropertyExist(properties)) { - // Azure could run without region - region = s3uri.getRegion().orElse("DUMMY-REGION"); - } else { - region = s3uri.getRegion().orElse(null); - } - if (StringUtils.isNotBlank(region)) { - otherProps.put(S3Properties.REGION, region); - } - } - // get endpoint first from properties, if not present, get it from s3 uri. - // If endpoint is missing, exception will be thrown. - locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle); - if (AzureProperties.checkAzureProviderPropertyExist(properties)) { - //todo waiting support for Azure - // For Azure's compatibility, we need bucket to connect to the blob storage's container - locationProperties.put(S3Properties.BUCKET, s3uri.getBucket()); - } - Map<String, String> p = new HashMap<>(properties); - p.putAll(otherProps); - this.storageProperties = StorageProperties.createStorageProperties(p); + Map<String, String> props = super.parseCommonProperties(properties); + //todo rename locationProperties + this.storageProperties = StorageProperties.createStorageProperties(props); locationProperties.putAll(storageProperties.getBackendConfigProperties()); - locationProperties.putAll(otherProps); - - filePath = NAME + S3URI.SCHEME_DELIM + s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey(); - + try { + String uri = storageProperties.checkLoadPropsAndReturnUri(props); + filePath = storageProperties.convertUrlToFilePath(uri); + } catch (UserException e) { + throw new RuntimeException(e); + } if (FeConstants.runningUnitTest) { // Just check FileSystemFactory.get(storageProperties); @@ -115,43 +62,6 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { } } - private String constructEndpoint(Map<String, String> properties, S3URI s3uri) throws AnalysisException { - String endpoint; - if (!AzureProperties.checkAzureProviderPropertyExist(properties)) { - // get endpoint first from properties, if not present, get it from s3 uri. - // If endpoint is missing, exception will be thrown. - endpoint = getOrDefaultAndRemove(properties, S3Properties.ENDPOINT, s3uri.getEndpoint().orElse("")); - /*if (Strings.isNullOrEmpty(endpoint)) { - throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ENDPOINT)); - }*/ - } else { - String bucket = s3uri.getBucket(); - String accountName = properties.getOrDefault(S3Properties.ACCESS_KEY, ""); - if (accountName.isEmpty()) { - throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ACCESS_KEY)); - } - endpoint = String.format(AzureProperties.AZURE_ENDPOINT_TEMPLATE, accountName, bucket); - } - return endpoint; - } - - private void forwardCompatibleDeprecatedKeys(Map<String, String> props) { - for (String deprecatedKey : DEPRECATED_KEYS) { - String value = props.remove(deprecatedKey); - if (!Strings.isNullOrEmpty(value)) { - props.put("s3." + deprecatedKey.toLowerCase(), value); - } - } - } - - private S3URI getS3Uri(String uri, boolean isPathStyle, boolean forceParsingStandardUri) throws AnalysisException { - try { - return S3URI.create(uri, isPathStyle, forceParsingStandardUri); - } catch (UserException e) { - throw new AnalysisException("parse s3 uri failed, uri = " + uri, e); - } - } - // =========== implement abstract methods of ExternalFileTableValuedFunction ================= @Override public TFileType getTFileType() { @@ -166,7 +76,7 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { @Override public BrokerDesc getBrokerDesc() { - return new BrokerDesc("S3TvfBroker", StorageType.S3, locationProperties); + return new BrokerDesc("S3TvfBroker", locationProperties, storageProperties); } // =========== implement abstract methods of TableValuedFunctionIf ================= diff --git a/regression-test/suites/refactor_storage_param_p0/backup_restore_cos.groovy b/regression-test/suites/refactor_storage_param_p0/backup_restore_cos.groovy index f9d3368c913..9ab43c86284 100644 --- a/regression-test/suites/refactor_storage_param_p0/backup_restore_cos.groovy +++ b/regression-test/suites/refactor_storage_param_p0/backup_restore_cos.groovy @@ -184,23 +184,17 @@ suite("refactor_storage_backup_restore_cos") { def dbName6 = currentDBName + "${objPrefix}_6" createDBAndTbl("${dbName6}") backupAndRestore("${s3repoName6}", dbName6, s3table, "backup_${s3repoName6}_test") + def s3repoName7 = "${objPrefix}_s3_repo_7" + createRepository("${s3repoName7}", "s3.endpoint", s3_endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "", "https://${bucket}/test_" + System.currentTimeMillis()) + def dbName7 = currentDBName + "${objPrefix}_7" + + createDBAndTbl("${dbName7}") + backupAndRestore("${s3repoName7}", dbName7, s3table, "backup_${s3repoName7}_test") def failedRepoName = "s3_repo_failed" // wrong address shouldFail { createRepository("${failedRepoName}", "s3.endpoint", s3_endpoint, "s3.region", region, "AWS_ACCESS_KEY", ak, "AWS_SECRET_KEY", sk, "true", "s3://ck/" + System.currentTimeMillis()) } - - shouldFail { - createRepository("${failedRepoName}", "s3.endpoint", s3_endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "", "https://${bucket}/test_" + System.currentTimeMillis()) - } - // http://${bucket}/test_"+System.currentTimeMillis() - shouldFail { - createRepository("${failedRepoName}", "s3.endpoint", s3_endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "", "http://${bucket}/test_" + System.currentTimeMillis()) - } - // https://${bucket}/test_"+System.currentTimeMillis() - shouldFail { - createRepository("${failedRepoName}", "s3.endpoint", s3_endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "", "https://${bucket}/test_" + System.currentTimeMillis()) - } //endpoint is empty shouldFail { createRepository("${failedRepoName}", "s3.endpoint", "", "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "", "s3://${bucket}/test_" + System.currentTimeMillis()) @@ -210,14 +204,14 @@ suite("refactor_storage_backup_restore_cos") { createRepository("${failedRepoName}", "s3.endpoint", "", "s3.region", "", "s3.access_key", ak, "s3.secret_key", sk, "", "s3://${bucket}/test_" + System.currentTimeMillis()) } } + /*-------------AWS S3--------------------------------*/ String ak = context.config.otherConfigs.get("AWSAK") String sk = context.config.otherConfigs.get("AWSSK") String s3_endpoint = "s3.ap-northeast-1.amazonaws.com" String region = "ap-northeast-1" String bucket = "selectdb-qa-datalake-test" String objPrefix="s3" - /*-------------AWS S3--------------------------------*/ - //test_backup_restore(ak,sk,s3_endpoint,region,bucket,objPrefix) + test_backup_restore(ak,sk,s3_endpoint,region,bucket,objPrefix) /*-----------------Tencent COS----------------*/ ak = context.config.otherConfigs.get("txYunAk") sk = context.config.otherConfigs.get("txYunSk") @@ -227,6 +221,24 @@ suite("refactor_storage_backup_restore_cos") { objPrefix="cos" test_backup_restore(ak,sk,s3_endpoint,region,bucket,objPrefix) + /* cos_url */ + def cos_repoName1 = "${objPrefix}_repo_cos_prefix_1" + // url is : cos://bucket/prefix/ + createRepository("${cos_repoName1}", "cos.endpoint", s3_endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "true", "cos://${bucket}/test_" + System.currentTimeMillis()) + + def cosDbName1 = currentDBName + "${objPrefix}_cos_1" + createDBAndTbl("${cosDbName1}") + backupAndRestore("${cos_repoName1}", cosDbName1, s3table, "backup_${cos_repoName1}_test") + def cos_repoName2 = "${objPrefix}_repo_cos_prefix_2" + // url is : cos://bucket/prefix/ + createRepository("${cos_repoName2}", "cos.endpoint", s3_endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "false", "https://${bucket}.${s3_endpoint}/test_" + System.currentTimeMillis()) + + def cosDbName2 = currentDBName + "${objPrefix}_cos_2" + createDBAndTbl("${cosDbName2}") + backupAndRestore("${cos_repoName2}", cosDbName2, s3table, "backup_${cos_repoName1}_test") + + + /*-----------------Huawei OBS----------------*/ ak = context.config.otherConfigs.get("hwYunAk") sk = context.config.otherConfigs.get("hwYunSk") @@ -235,6 +247,22 @@ suite("refactor_storage_backup_restore_cos") { bucket = "doris-build"; objPrefix="obs" test_backup_restore(ak,sk,s3_endpoint,region,bucket,objPrefix) + def obs_repoName1 = "${objPrefix}_repo_obs_prefix_1" + // url is : cos://bucket/prefix/ + createRepository("${obs_repoName1}", "obs.endpoint", s3_endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "true", "obs://${bucket}/test_" + System.currentTimeMillis()) + + def obsDbName1 = currentDBName + "${objPrefix}_obs_1" + createDBAndTbl("${obsDbName1}") + backupAndRestore("${obs_repoName1}", obsDbName1, s3table, "backup_${obs_repoName1}_test") + def obs_repoName2 = "${objPrefix}_repo_obs_prefix_2" + // url is : cos://bucket/prefix/ + createRepository("${obs_repoName2}", "obs.endpoint", s3_endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "false", "https://${bucket}.${s3_endpoint}/test_" + System.currentTimeMillis()) + + def obsDbName2 = currentDBName + "${objPrefix}_obs_2" + createDBAndTbl("${obsDbName2}") + backupAndRestore("${obs_repoName2}", obsDbName2, s3table, "backup_${obs_repoName1}_test") + + /*-----------------Aliyun OSS----------------*/ ak = context.config.otherConfigs.get("aliYunAk") sk = context.config.otherConfigs.get("aliYunSk") @@ -244,4 +272,20 @@ suite("refactor_storage_backup_restore_cos") { objPrefix="oss" // oss has some problem, so we comment it. //test_backup_restore(ak,sk,s3_endpoint,region,bucket,objPrefix) + def oss_repoName1 = "${objPrefix}_repo_oss_prefix_1" + // url is : cos://bucket/prefix/ + createRepository("${oss_repoName1}", "oss.endpoint", s3_endpoint, "oss.region", region, "oss.access_key", ak, "oss.secret_key", sk, "false", "oss://${bucket}/test_" + System.currentTimeMillis()) + + def ossDbName1 = currentDBName + "${objPrefix}_oss_1" + createDBAndTbl("${ossDbName1}") + backupAndRestore("${oss_repoName1}", ossDbName1, s3table, "backup_${oss_repoName1}_test") + def oss_repoName2 = "${objPrefix}_repo_oss_prefix_2" + // url is : cos://bucket/prefix/ + createRepository("${oss_repoName2}", "oss.endpoint", s3_endpoint, "oss.region", region, "oss.access_key", ak, "oss.secret_key", sk, "false", "https://${bucket}.${s3_endpoint}/test_" + System.currentTimeMillis()) + + def ossDbName2 = currentDBName + "${objPrefix}_oss_2" + createDBAndTbl("${ossDbName2}") + backupAndRestore("${oss_repoName2}", ossDbName2, s3table, "backup_${oss_repoName1}_test") + + } \ No newline at end of file diff --git a/regression-test/suites/refactor_storage_param_p0/s3_load.groovy b/regression-test/suites/refactor_storage_param_p0/s3_load.groovy index 62990050ae0..27814356220 100644 --- a/regression-test/suites/refactor_storage_param_p0/s3_load.groovy +++ b/regression-test/suites/refactor_storage_param_p0/s3_load.groovy @@ -129,9 +129,8 @@ suite("refactor_storage_param_load") { s3Load("s3://${bucket}${filePath}", bucket, "s3.endpoint", endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "false") s3Load("s3://${bucket}${filePath}", bucket, "s3.endpoint", endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "") s3Load("s3://${bucket}${filePath}", bucket, "s3.endpoint", endpoint, "s3.region", region, "AWS_ACCESS_KEY", ak, "AWS_SECRET_KEY", sk, "") - s3Load("http://${bucket}${filePath}", bucket, "s3.endpoint", endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "true") - s3Load("http://${bucket}${filePath}", bucket, "s3.endpoint", endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "") - s3Load("https://${bucket}${filePath}", bucket, "s3.endpoint", endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "") + s3Load("http://${bucket}.${endpoint}${filePath}", bucket, "s3.endpoint", endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "false") + s3Load("http://${bucket}.${endpoint}${filePath}", bucket, "s3.endpoint", endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "") s3Load("https://${bucket}${filePath}", bucket, "s3.endpoint", endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "false") shouldFail { s3Load("https://${bucket}${filePath}", bucket, "", endpoint, "s3.region", region, "s3.access_key", ak, "s3.secret_key", sk, "false") @@ -172,10 +171,8 @@ suite("refactor_storage_param_load") { s3Load("obs://${bucket}${filePath}", bucket, "obs.endpoint", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "false") s3Load("obs://${bucket}${filePath}", bucket, "obs.endpoint", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "") s3Load("s3://${bucket}${filePath}", bucket, "obs.endpoint", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "") - s3Load("http://${bucket}${filePath}", bucket, "obs.endpoint", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "") - s3Load("https://${bucket}${filePath}", bucket, "obs.endpoint", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "") - s3Load("http://${bucket}${filePath}", bucket, "obs.endpoint", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "true") - s3Load("http://${bucket}${filePath}", bucket, "obs.endpoint", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "false") + s3Load("http://${bucket}.${endpoint}${filePath}", bucket, "obs.endpoint", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "") + s3Load("https://${bucket}.${endpoint}${filePath}", bucket, "obs.endpoint", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "") shouldFail { s3Load("https://${bucket}${filePath}", bucket, "", endpoint, "obs.region", region, "obs.access_key", ak, "obs.secret_key", sk, "false") } @@ -217,10 +214,9 @@ suite("refactor_storage_param_load") { s3Load("cos://${bucket}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "false") s3Load("cos://${bucket}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "") s3Load("s3://${bucket}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "") - s3Load("http://${bucket}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "") - s3Load("https://${bucket}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "") - s3Load("http://${bucket}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "true") - s3Load("http://${bucket}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "false") + s3Load("http://${bucket}.${endpoint}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "") + s3Load("https://${bucket}.${endpoint}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "") + s3Load("http://${bucket}.${endpoint}${filePath}", bucket, "cos.endpoint", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "false") shouldFail { s3Load("https://${bucket}${filePath}", bucket, "", endpoint, "cos.region", region, "cos.access_key", ak, "cos.secret_key", sk, "false") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org