This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new 7ccd504932 [Broker](bos) suppoert baidu bos object storage for broker (#15448) 7ccd504932 is described below commit 7ccd5049325515d21489eaee506084f47d3a4101 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Fri Dec 30 12:39:10 2022 +0800 [Broker](bos) suppoert baidu bos object storage for broker (#15448) --- .../java/org/apache/doris/analysis/ExportStmt.java | 8 +- .../doris/broker/hdfs/FileSystemIdentity.java | 17 +++ .../doris/broker/hdfs/FileSystemManager.java | 119 +++++++++++++++++++-- 3 files changed, 135 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 077729c81e..62fb2c9977 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -236,14 +236,16 @@ public class ExportStmt extends StatementBase { URI uri = URI.create(path); String schema = uri.getScheme(); if (type == StorageBackend.StorageType.BROKER) { - if (schema == null || (!schema.equalsIgnoreCase("hdfs") + if (schema == null || (!schema.equalsIgnoreCase("bos") + && !schema.equalsIgnoreCase("afs") + && !schema.equalsIgnoreCase("hdfs") && !schema.equalsIgnoreCase("ofs") && !schema.equalsIgnoreCase("obs") && !schema.equalsIgnoreCase("oss") && !schema.equalsIgnoreCase("s3a") && !schema.equalsIgnoreCase("cosn"))) { - throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'ofs://', 'obs://'," - + "'oss://', 's3a://' or 'cosn://' path."); + throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://'," + + " 'ofs://', 'obs://', 'oss://', 's3a://' or 'cosn://' path."); } } else if (type == StorageBackend.StorageType.S3) { if (schema == null || !schema.equalsIgnoreCase("s3")) { diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java index 9446638a52..2885ba1a58 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java @@ -21,10 +21,19 @@ public class FileSystemIdentity { private final String hostName; private final String ugiInfo; + + private final String extraInfo; public FileSystemIdentity(String hostName, String ugiInfo) { this.hostName = hostName; this.ugiInfo = ugiInfo; + this.extraInfo = null; + } + + public FileSystemIdentity(String hostName, String ugiInfo, String extraInfo) { + this.hostName = hostName; + this.ugiInfo = ugiInfo; + this.extraInfo = extraInfo; } @Override @@ -34,6 +43,7 @@ public class FileSystemIdentity { result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); result = prime * result + ((ugiInfo == null) ? 0 : ugiInfo.hashCode()); + result = prime * result + ((extraInfo == null) ? 0 : extraInfo.hashCode()); return result; } @@ -63,6 +73,13 @@ public class FileSystemIdentity { } else if (!ugiInfo.equals(other.ugiInfo)) { return false; } + if (extraInfo == null) { + if (other.extraInfo != null) { + return false; + } + } else if (!extraInfo.equals(other.extraInfo)) { + return false; + } return true; } } diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 42ff97123b..27b548af8c 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -42,6 +42,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetAddress; +import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -71,6 +72,8 @@ public class FileSystemManager { private static final String OBS_SCHEME = "obs"; private static final String OSS_SCHEME = "oss"; private static final String COS_SCHEME = "cosn"; + private static final String BOS_SCHEME = "bos"; + private static final String AFS_SCHEME = "afs"; private static final String USER_NAME_KEY = "username"; private static final String PASSWORD_KEY = "password"; @@ -132,6 +135,23 @@ public class FileSystemManager { private static final String FS_COS_IMPL = "fs.cosn.impl"; private static final String FS_COS_IMPL_DISABLE_CACHE = "fs.cosn.impl.disable.cache"; + // arguments for bos + private static final String FS_BOS_ACCESS_KEY = "fs.bos.access.key"; + private static final String FS_BOS_SECRET_KEY = "fs.bos.secret.access.key"; + private static final String FS_BOS_ENDPOINT = "fs.bos.endpoint"; + private static final String FS_BOS_IMPL = "fs.bos.impl"; + private static final String FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE = "fs.bos.multipart.uploads.block.size"; + + + // arguments for afs + private static final String HADOOP_JOB_GROUP_NAME = "hadoop.job.group.name"; + private static final String HADOOP_JOB_UGI = "hadoop.job.ugi"; + private static final String FS_DEFAULT_NAME = "fs.default.name"; + private static final String FS_AFS_IMPL = "fs.afs.impl"; + private static final String DFS_AGENT_PORT = "dfs.agent.port"; + private static final String DFS_CLIENT_AUTH_METHOD = "dfs.client.auth.method"; + private static final String DFS_RPC_TIMEOUT = "dfs.rpc.timeout"; + private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2); private int readBufferSize = 128 << 10; // 128k @@ -197,6 +217,8 @@ public class FileSystemManager { brokerFileSystem = getOSSFileSystem(path, properties); } else if (scheme.equals(COS_SCHEME)) { brokerFileSystem = getCOSFileSystem(path, properties); + } else if (scheme.equals(BOS_SCHEME)) { + brokerFileSystem = getBOSFileSystem(path, properties); } else { throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH, "invalid path. scheme is not supported"); @@ -548,8 +570,8 @@ public class FileSystemManager { String endpoint = properties.getOrDefault(FS_OSS_ENDPOINT, ""); String disableCache = properties.getOrDefault(FS_OSS_IMPL_DISABLE_CACHE, "true"); String host = OSS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost(); - String obsUgi = accessKey + "," + secretKey; - FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, obsUgi); + String ossUgi = accessKey + "," + secretKey; + FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ossUgi); cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); @@ -608,11 +630,11 @@ public class FileSystemManager { } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) { kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT); } else { - throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, "keytab is required for kerberos authentication"); } if (!properties.containsKey(KERBEROS_PRINCIPAL)) { - throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, "principal is required for kerberos authentication"); } else { kerberosContent = kerberosContent + properties.get(KERBEROS_PRINCIPAL); @@ -651,8 +673,8 @@ public class FileSystemManager { // pass kerberos keytab content use base64 encoding // so decode it and write it to tmp path under /tmp // because ugi api only accept a local path as argument - String keytab_content = properties.get(KERBEROS_KEYTAB_CONTENT); - byte[] base64decodedBytes = Base64.getDecoder().decode(keytab_content); + String keytabContent = properties.get(KERBEROS_KEYTAB_CONTENT); + byte[] base64decodedBytes = Base64.getDecoder().decode(keytabContent); long currentTime = System.currentTimeMillis(); Random random = new Random(currentTime); int randNumber = random.nextInt(10000); @@ -737,6 +759,91 @@ public class FileSystemManager { } } + /** + * visible for test + * <p> + * file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey + * + * @param path + * @param properties + * @return + * @throws URISyntaxException + * @throws Exception + */ + public BrokerFileSystem getBOSFileSystem(String path, Map<String, String> properties) { + WildcardURI pathUri = new WildcardURI(path); + String accessKey = properties.getOrDefault(FS_BOS_ACCESS_KEY, ""); + String secretKey = properties.getOrDefault(FS_BOS_SECRET_KEY, ""); + String endpoint = properties.getOrDefault(FS_BOS_ENDPOINT, ""); + String multiPartUploadBlockSize = properties.getOrDefault(FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE, "9437184"); + // endpoint is the server host, pathUri.getUri().getHost() is the bucket + // we should use these two params as the host identity, because FileSystem will cache both. + String host = BOS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost(); + String bosUgi = accessKey + "," + secretKey; + FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, bosUgi); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); + fileSystem.getLock().lock(); + try { + if (fileSystem.getDFSFileSystem() == null) { + logger.info("could not find file system for path " + path + " create a new one"); + // create a new filesystem + Configuration conf = new Configuration(); + conf.set(FS_BOS_ACCESS_KEY, accessKey); + conf.set(FS_BOS_SECRET_KEY, secretKey); + conf.set(FS_BOS_ENDPOINT, endpoint); + conf.set(FS_BOS_IMPL, "org.apache.hadoop.fs.bos.BaiduBosFileSystem"); + conf.set(FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE, multiPartUploadBlockSize); + FileSystem bosFileSystem = FileSystem.get(pathUri.getUri(), conf); + fileSystem.setFileSystem(bosFileSystem); + } + return fileSystem; + } catch (Exception e) { + logger.error("errors while connect to " + path, e); + throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e); + } finally { + fileSystem.getLock().unlock(); + } + } + + private BrokerFileSystem getAfsFileSystem(String path, Map<String, String> properties) { + URI pathUri = new WildcardURI(path).getUri(); + String host = pathUri.getScheme() + "://" + pathUri.getAuthority(); + String username = properties.containsKey(USER_NAME_KEY) ? properties.get(USER_NAME_KEY) : ""; + String password = properties.containsKey(PASSWORD_KEY) ? properties.get(PASSWORD_KEY) : ""; + String group = properties.containsKey(HADOOP_JOB_GROUP_NAME) ? properties.get(HADOOP_JOB_GROUP_NAME) : ""; + String afsUgi = username + "," + password; + FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, afsUgi, group); + cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); + fileSystem.getLock().lock(); + try { + if (!cachedFileSystem.containsKey(fileSystemIdentity)) { + // this means the file system is closed by file system checker thread + // it is a corner case + return null; + } + if (fileSystem.getDFSFileSystem() == null) { + logger.info("could not find file system for path " + path + " create a new one"); + // create a new file system + Configuration conf = new Configuration(); + conf.set(HADOOP_JOB_UGI, afsUgi); + conf.set(HADOOP_JOB_GROUP_NAME, group); + conf.set(FS_DEFAULT_NAME, host); + conf.set(FS_AFS_IMPL, "org.apache.hadoop.fs.DFileSystem"); + conf.set(DFS_CLIENT_AUTH_METHOD, properties.getOrDefault(DFS_CLIENT_AUTH_METHOD, "3")); + conf.set(DFS_AGENT_PORT, properties.getOrDefault(DFS_AGENT_PORT, "20001")); + conf.set(DFS_RPC_TIMEOUT, properties.getOrDefault(DFS_RPC_TIMEOUT, "300000")); + FileSystem dfsFileSystem = FileSystem.get(URI.create(host), conf); + fileSystem.setFileSystem(dfsFileSystem); + } + return fileSystem; + } catch (Exception e) { + logger.error("errors while connect to " + path, e); + throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e, e.getMessage()); + } finally { + fileSystem.getLock().unlock(); + } + } public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) { List<TBrokerFileStatus> resultFileStatus = null; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org