This is an automated email from the ASF dual-hosted git repository.

lide pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6af1c52e13 [Feature] add support for tencent chdfs (#8963)
6af1c52e13 is described below

commit 6af1c52e135de8b96f02942e0492313b446440f1
Author: wucheng <wucheng.xid...@foxmail.com>
AuthorDate: Tue Apr 12 16:02:42 2022 +0800

    [Feature] add support for tencent chdfs (#8963)
    
    Co-authored-by: chengwu <chen...@tencent.com>
---
 .../java/org/apache/doris/analysis/ExportStmt.java |   6 +-
 .../org/apache/doris/analysis/StorageBackend.java  |   3 +
 .../doris/broker/hdfs/FileSystemManager.java       | 133 +++++++++++++++++++++
 gensrc/thrift/Types.thrift                         |   1 +
 4 files changed, 140 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 48fb782d24..80dcf2798e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -236,9 +236,9 @@ public class ExportStmt extends StatementBase {
         String schema = uri.getScheme();
         if (type == StorageBackend.StorageType.BROKER) {
             if (schema == null || (!schema.equalsIgnoreCase("bos") && 
!schema.equalsIgnoreCase("afs")
-                    && !schema.equalsIgnoreCase("hdfs"))) {
-                throw new AnalysisException("Invalid export path. please use 
valid 'HDFS://', 'AFS://' or 'BOS://' " +
-                        "path.");
+                    && !schema.equalsIgnoreCase("hdfs") && 
!schema.equalsIgnoreCase("ofs"))) {
+                throw new AnalysisException("Invalid export path. please use 
valid 'HDFS://', 'AFS://' , 'BOS://', "
+                        + "or 'ofs://' path.");
             }
         } else if (type == StorageBackend.StorageType.S3) {
             if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 933153b728..70b10b4f06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -111,6 +111,7 @@ public class StorageBackend extends StorageDesc implements 
ParseNode {
     public enum StorageType {
         BROKER("Doris Broker"),
         S3("Amazon S3 Simple Storage Service"),
+        OFS("Tencent CHDFS"),
         // the following is not used currently
         HDFS("Hadoop Distributed File System"),
         LOCAL("Local file system");
@@ -132,6 +133,8 @@ public class StorageBackend extends StorageDesc implements 
ParseNode {
                     return TStorageBackendType.S3;
                 case HDFS:
                     return TStorageBackendType.HDFS;
+                case OFS:
+                    return TStorageBackendType.OFS;
                 case LOCAL:
                     return TStorageBackendType.LOCAL;
                 default:
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 2c00a2ae96..dc94720465 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -67,6 +67,7 @@ public class FileSystemManager {
     private static final String HDFS_SCHEME = "hdfs";
     private static final String S3A_SCHEME = "s3a";
     private static final String KS3_SCHEME = "ks3";
+    private static final String CHDFS_SCHEME = "ofs";
 
     private static final String USER_NAME_KEY = "username";
     private static final String PASSWORD_KEY = "password";
@@ -163,6 +164,8 @@ public class FileSystemManager {
             brokerFileSystem = getS3AFileSystem(path, properties);
         } else if (scheme.equals(KS3_SCHEME)) {
             brokerFileSystem = getKS3FileSystem(path, properties);
+        } else if (scheme.equals(CHDFS_SCHEME)) {
+            brokerFileSystem = getChdfsFileSystem(path, properties);
         }else {
             throw new 
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
                 "invalid path. scheme is not supported");
@@ -490,6 +493,136 @@ public class FileSystemManager {
         }
     }
 
+    /**
+     * visible for test
+     *
+     * file system handle is cached, the identity is for all chdfs.
+     * @param path
+     * @param properties
+     * @return
+     * @throws URISyntaxException
+     * @throws Exception
+     */
+    public BrokerFileSystem getChdfsFileSystem(String path, Map<String, 
String> properties) {
+        WildcardURI pathUri = new WildcardURI(path);
+        String host = CHDFS_SCHEME;
+        String authentication = 
properties.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+                AUTHENTICATION_SIMPLE);
+        if (Strings.isNullOrEmpty(authentication) || 
(!authentication.equals(AUTHENTICATION_SIMPLE)
+                && !authentication.equals(AUTHENTICATION_KERBEROS))) {
+            logger.warn("invalid authentication:" + authentication);
+            throw new 
BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                    "invalid authentication:" + authentication);
+        }
+
+        FileSystemIdentity fileSystemIdentity = null;
+        if (authentication.equals(AUTHENTICATION_SIMPLE)) {
+            fileSystemIdentity = new FileSystemIdentity(host, "");
+        } else {
+            // for kerberos, use host + principal + keytab as 
filesystemindentity
+            String kerberosContent = "";
+            if (properties.containsKey(KERBEROS_KEYTAB)) {
+                kerberosContent = properties.get(KERBEROS_KEYTAB);
+            } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+                kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT);
+            } else {
+                throw  new 
BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                        "keytab is required for kerberos authentication");
+            }
+            if (!properties.containsKey(KERBEROS_PRINCIPAL)) {
+                throw  new 
BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                        "principal is required for kerberos authentication");
+            } else {
+                kerberosContent = kerberosContent + 
properties.get(KERBEROS_PRINCIPAL);
+            }
+            try {
+                MessageDigest digest = MessageDigest.getInstance("md5");
+                byte[] result = digest.digest(kerberosContent.getBytes());
+                String kerberosUgi = new String(result);
+                fileSystemIdentity = new FileSystemIdentity(host, kerberosUgi);
+            } catch (NoSuchAlgorithmException e) {
+                throw  new 
BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                        e.getMessage());
+            }
+        }
+
+
+        BrokerFileSystem fileSystem = null;
+        cachedFileSystem.putIfAbsent(fileSystemIdentity, new 
BrokerFileSystem(fileSystemIdentity));
+        fileSystem = cachedFileSystem.get(fileSystemIdentity);
+        if (fileSystem == null) {
+            // it means it is removed concurrently by checker thread
+            return null;
+        }
+        fileSystem.getLock().lock();
+
+        try {
+            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
+                // this means the file system is closed by file system checker 
thread
+                // it is a corner case
+                return null;
+            }
+            // create a new filesystem
+            Configuration conf = new Configuration();
+            for (Map.Entry<String, String> propElement : 
properties.entrySet()) {
+                conf.set(propElement.getKey(), propElement.getValue());
+            }
+
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("create file system for new path " + path);
+                String tmpFilePath = null;
+                if (authentication.equals(AUTHENTICATION_KERBEROS)){
+                    
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+                            AUTHENTICATION_KERBEROS);
+
+                    String principal = 
preparePrincipal(properties.get(KERBEROS_PRINCIPAL));
+                    String keytab = "";
+                    if (properties.containsKey(KERBEROS_KEYTAB)) {
+                        keytab = properties.get(KERBEROS_KEYTAB);
+                    } else if 
(properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+                        // pass kerberos keytab content use base64 encoding
+                        // so decode it and write it to tmp path under /tmp
+                        // because ugi api only accept a local path as argument
+                        String keytab_content = 
properties.get(KERBEROS_KEYTAB_CONTENT);
+                        byte[] base64decodedBytes = 
Base64.getDecoder().decode(keytab_content);
+                        long currentTime = System.currentTimeMillis();
+                        Random random = new Random(currentTime);
+                        int randNumber = random.nextInt(10000);
+                        tmpFilePath = "/tmp/." + Long.toString(currentTime) + 
"_" + Integer.toString(randNumber);
+                        FileOutputStream fileOutputStream = new 
FileOutputStream(tmpFilePath);
+                        fileOutputStream.write(base64decodedBytes);
+                        fileOutputStream.close();
+                        keytab = tmpFilePath;
+                    } else {
+                        throw  new 
BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                                "keytab is required for kerberos 
authentication");
+                    }
+                    UserGroupInformation.setConfiguration(conf);
+                    UserGroupInformation.loginUserFromKeytab(principal, 
keytab);
+                    if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+                        try {
+                            File file = new File(tmpFilePath);
+                            if(!file.delete()){
+                                logger.warn("delete tmp file:" +  tmpFilePath 
+ " failed");
+                            }
+                        } catch (Exception e) {
+                            throw new  
BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND,
+                                    e.getMessage());
+                        }
+                    }
+                }
+                FileSystem chdfsFileSystem = FileSystem.get(pathUri.getUri(), 
conf);
+                fileSystem.setFileSystem(chdfsFileSystem);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            logger.error("errors while connect to " + path, e);
+            throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+        } finally {
+            fileSystem.getLock().unlock();
+        }
+    }
+
     public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, 
Map<String, String> properties) {
         List<TBrokerFileStatus> resultFileStatus = null;
         WildcardURI pathUri = new WildcardURI(path);
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 145cc45555..68a7160d7b 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -95,6 +95,7 @@ enum TTypeNodeType {
 enum TStorageBackendType {
     BROKER,
     S3,
+    OFS,
     HDFS,
     LOCAL
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to