This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-refactor_property
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-refactor_property by 
this push:
     new 3896b94db41 [feat](storage)Refactor HDFS integration: move basic 
checks to storage layer and add tests (#49932)
3896b94db41 is described below

commit 3896b94db41297b91f4a70aa0fbc3f8b73bb76bf
Author: Calvin Kirs <guoqi...@selectdb.com>
AuthorDate: Sat Apr 12 10:25:36 2025 +0800

    [feat](storage)Refactor HDFS integration: move basic checks to storage 
layer and add tests (#49932)
    
    ### What problem does this PR solve?
    
    Refactored HDFS parameter integration to align with the updated
    configuration structure.
    
    Moved basic validation and parameter transformation logic down to the
    HDFS-specific storage layer, keeping the business layer clean and
    storage-agnostic.
    
    Maintained consistent behavior at the business layer, ensuring it does
    not need to handle any storage-specific logic.
---
 .../doris/common/CatalogConfigFileUtils.java       |   2 +-
 fe/fe-core/pom.xml                                 |  10 +
 .../java/org/apache/doris/analysis/LoadStmt.java   |  21 --
 .../property/storage/HDFSProperties.java           |  37 ++-
 .../property/storage/HdfsPropertiesUtils.java      | 100 +++++++
 .../datasource/property/storage/S3Properties.java  |   6 +-
 .../property/storage/StorageProperties.java        |   2 +-
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |   7 +-
 .../tablefunction/HdfsTableValuedFunction.java     |  38 +--
 .../refactor_storage_param_p0/hdfs_all_test.groovy | 318 +++++++++++++++++++++
 10 files changed, 475 insertions(+), 66 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java
index e6fe4c7f10c..6a1c552d972 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java
@@ -52,7 +52,7 @@ public class CatalogConfigFileUtils {
         // Iterate over the comma-separated list of resource files.
         for (String resource : resourcesPath.split(",")) {
             // Construct the full path to the resource file.
-            String resourcePath = configDir + File.separator + resource.trim();
+            String resourcePath = configDir + resource.trim();
             File file = new File(resourcePath);
 
             // Check if the file exists and is a regular file; if not, throw 
an exception.
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 371481a2e91..3cfd3b64739 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -573,6 +573,16 @@ under the License.
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-auth</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-runtime</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-api</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
 
         <!-- lakesoul -->
         <dependency>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index ba835821a3e..6b92bee2a56 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -27,7 +27,6 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.load.EtlJobType;
@@ -520,26 +519,6 @@ public class LoadStmt extends DdlStmt implements 
NotFallbackInParser {
     }
 
 
-    private String getProviderFromEndpoint() {
-        Map<String, String> properties = brokerDesc.getProperties();
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-            if (entry.getKey().equalsIgnoreCase(S3Properties.PROVIDER)) {
-                // S3 Provider properties should be case insensitive.
-                return entry.getValue().toUpperCase();
-            }
-        }
-        return S3Properties.S3_PROVIDER;
-    }
-
-    private String getBucketFromFilePath(String filePath) throws Exception {
-        String[] parts = filePath.split("\\/\\/");
-        if (parts.length < 2) {
-            throw new Exception("filePath is not valid");
-        }
-        String buckt = parts[1].split("\\/")[0];
-        return buckt;
-    }
-
     public String getComment() {
         return comment;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
index 6b64df1817a..159a46b7e88 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java
@@ -17,12 +17,12 @@
 
 package org.apache.doris.datasource.property.storage;
 
-import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.property.ConnectorProperty;
 
 import com.google.common.base.Strings;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.HashMap;
@@ -61,6 +61,9 @@ public class HDFSProperties extends StorageProperties {
             description = "Whether to enable the impersonation of HDFS.")
     private boolean hdfsImpersonationEnabled = false;
 
+    @ConnectorProperty(names = {"fs.defaultFS"}, required = false, description 
= "")
+    private String fsDefaultFS = "";
+
     /**
      * The final HDFS configuration map that determines the effective settings.
      * Priority rules:
@@ -72,17 +75,26 @@ public class HDFSProperties extends StorageProperties {
 
     public HDFSProperties(Map<String, String> origProps) {
         super(Type.HDFS, origProps);
-        // to be care     setOrigProps(matchParams);
         loadFinalHdfsConfig(origProps);
     }
 
+    public static boolean guessIsMe(Map<String, String> props) {
+        if (MapUtils.isEmpty(props)) {
+            return false;
+        }
+        if (props.containsKey("hadoop.config.resources") || 
props.containsKey("hadoop.security.authentication")) {
+            return true;
+        }
+        return false;
+    }
+
     private void loadFinalHdfsConfig(Map<String, String> origProps) {
         if (MapUtils.isEmpty(origProps)) {
             return;
         }
         finalHdfsConfig = new HashMap<>();
         origProps.forEach((key, value) -> {
-            if (key.startsWith("hadoop.") || key.startsWith("dfs.")) {
+            if (key.startsWith("hadoop.") || key.startsWith("dfs.") || 
key.equals("fs.defaultFS")) {
                 finalHdfsConfig.put(key, value);
             }
         });
@@ -98,12 +110,14 @@ public class HDFSProperties extends StorageProperties {
         super.checkRequiredProperties();
         checkConfigFileIsValid(hadoopConfigResources);
         if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) {
-            if (Strings.isNullOrEmpty(hdfsKerberosPrincipal)
-                    || Strings.isNullOrEmpty(hdfsKerberosKeytab)) {
+            if (Strings.isNullOrEmpty(hdfsKerberosPrincipal) || 
Strings.isNullOrEmpty(hdfsKerberosKeytab)) {
                 throw new IllegalArgumentException("HDFS authentication type 
is kerberos, "
                         + "but principal or keytab is not set.");
             }
         }
+        if (StringUtils.isBlank(fsDefaultFS)) {
+            this.fsDefaultFS = 
HdfsPropertiesUtils.constructDefaultFsFromUri(origProps);
+        }
     }
 
     private void checkConfigFileIsValid(String configFile) {
@@ -119,6 +133,9 @@ public class HDFSProperties extends StorageProperties {
         if (MapUtils.isNotEmpty(finalHdfsConfig)) {
             finalHdfsConfig.forEach(conf::set);
         }
+        if (StringUtils.isNotBlank(fsDefaultFS)) {
+            conf.set("fs.defaultFS", fsDefaultFS);
+        }
         //todo waiting be support should use new params
         conf.set("hdfs.security.authentication", hdfsAuthenticationType);
         if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) {
@@ -131,20 +148,20 @@ public class HDFSProperties extends StorageProperties {
     }
 
     public Configuration getHadoopConfiguration() {
-        Configuration conf = new Configuration(false);
+        Configuration conf = new Configuration(true);
         Map<String, String> allProps = 
loadConfigFromFile(getResourceConfigPropName());
         allProps.forEach(conf::set);
         if (MapUtils.isNotEmpty(finalHdfsConfig)) {
             finalHdfsConfig.forEach(conf::set);
         }
-        conf.set("hdfs.security.authentication", hdfsAuthenticationType);
+        /* conf.set("hadoop.kerberos.authentication", hdfsAuthenticationType);
         if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) {
             conf.set("hadoop.kerberos.principal", hdfsKerberosPrincipal);
             conf.set("hadoop.kerberos.keytab", hdfsKerberosKeytab);
         }
         if (!Strings.isNullOrEmpty(hadoopUsername)) {
             conf.set("hadoop.username", hadoopUsername);
-        }
+        }*/
 
         return conf;
     }
@@ -163,12 +180,12 @@ public class HDFSProperties extends StorageProperties {
 
     @Override
     public String convertUrlToFilePath(String url) throws UserException {
-        throw new NotImplementedException("Support HDFS is not implemented");
+        return HdfsPropertiesUtils.convertUrlToFilePath(url);
     }
 
     @Override
     public String checkLoadPropsAndReturnUri(Map<String, String> loadProps) 
throws UserException {
-        throw new NotImplementedException("Support HDFS is not implemented");
+        return HdfsPropertiesUtils.checkLoadPropsAndReturnUri(loadProps);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
new file mode 100644
index 00000000000..f605c977ec1
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.URI;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class HdfsPropertiesUtils {
+    private static final String URI_KEY = "uri";
+
+    private static Set<String> supportSchema = new HashSet<>();
+
+    static {
+        supportSchema.add("hdfs");
+        supportSchema.add("viewfs");
+    }
+
+    public static String checkLoadPropsAndReturnUri(Map<String, String> props) 
throws UserException {
+        if (props.isEmpty()) {
+            throw new UserException("props is empty");
+        }
+        if (!props.containsKey(URI_KEY)) {
+            throw new UserException("props must contain uri");
+        }
+        String uriStr = props.get(URI_KEY);
+        return convertAndCheckUri(uriStr);
+    }
+
+
+    public static String convertUrlToFilePath(String uriStr) throws 
UserException {
+
+        return convertAndCheckUri(uriStr);
+    }
+
+    public static String constructDefaultFsFromUri(Map<String, String> props) {
+        if (props.isEmpty()) {
+            return null;
+        }
+        if (!props.containsKey(URI_KEY)) {
+            return null;
+        }
+        String uriStr = props.get(URI_KEY);
+        if (StringUtils.isBlank(uriStr)) {
+            return null;
+        }
+        URI uri = null;
+        try {
+            uri = URI.create(uriStr);
+        } catch (AnalysisException e) {
+            return null;
+        }
+        String schema = uri.getScheme();
+        if (StringUtils.isBlank(schema)) {
+            throw new IllegalArgumentException("Invalid uri: " + uriStr + 
"extract schema is null");
+        }
+        if (!supportSchema.contains(schema.toLowerCase())) {
+            throw new IllegalArgumentException("Invalid export path:"
+                    + schema + " , please use valid 'hdfs://' or 'viewfs://' 
path.");
+        }
+        return uri.getScheme() + "://" + uri.getAuthority();
+    }
+
+    private static String convertAndCheckUri(String uriStr) throws 
AnalysisException {
+        if (StringUtils.isBlank(uriStr)) {
+            throw new IllegalArgumentException("uri is null, pls check your 
params");
+        }
+        URI uri = URI.create(uriStr);
+        String schema = uri.getScheme();
+        if (StringUtils.isBlank(schema)) {
+            throw new IllegalArgumentException("Invalid uri: " + uriStr + 
"extract schema is null");
+        }
+        if (!supportSchema.contains(schema.toLowerCase())) {
+            throw new IllegalArgumentException("Invalid export path:"
+                    + schema + " , please use valid 'hdfs://' or 'viewfs://' 
path.");
+        }
+        return uri.getScheme() + "://" + uri.getAuthority() + uri.getPath();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index d17504ba3de..7435efab2da 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -32,12 +32,12 @@ import java.util.Map;
 public class S3Properties extends AbstractObjectStorageProperties {
 
 
-    @ConnectorProperty(names = {"s3.endpoint", "AWS_ENDPOINT", "access_key"},
+    @ConnectorProperty(names = {"s3.endpoint", "AWS_ENDPOINT", "endpoint", 
"ENDPOINT"},
             required = false,
             description = "The endpoint of S3.")
     protected String s3Endpoint = "";
 
-    @ConnectorProperty(names = {"s3.region", "AWS_REGION", "region", "region"},
+    @ConnectorProperty(names = {"s3.region", "AWS_REGION", "region", "REGION"},
             required = false,
             description = "The region of S3.")
     protected String s3Region = "us-east-1";
@@ -46,7 +46,7 @@ public class S3Properties extends 
AbstractObjectStorageProperties {
             description = "The access key of S3.")
     protected String s3AccessKey = "";
 
-    @ConnectorProperty(names = {"s3.secret_key", "AWS_SECRET_KEY", 
"secret_key"},
+    @ConnectorProperty(names = {"s3.secret_key", "AWS_SECRET_KEY", 
"secret_key", "SECRET_KEY"},
             description = "The secret key of S3.")
     protected String s3SecretKey = "";
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 555a878937f..44e940924b3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -107,7 +107,7 @@ public abstract class StorageProperties extends 
ConnectionProperties {
     public static StorageProperties createStorageProperties(Map<String, 
String> origProps) {
         StorageProperties storageProperties = null;
         // 1. parse the storage properties by user specified fs.xxx.support 
properties
-        if (isFsSupport(origProps, FS_HDFS_SUPPORT)) {
+        if (isFsSupport(origProps, FS_HDFS_SUPPORT) || 
HDFSProperties.guessIsMe(origProps)) {
             storageProperties = new HDFSProperties(origProps);
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 49023f5a989..bb1b352fc3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -68,13 +68,14 @@ public class DFSFileSystem extends RemoteFileSystem {
 
 
         super(StorageBackend.StorageType.HDFS.name(), 
StorageBackend.StorageType.HDFS);
-        this.properties.putAll(properties);
+        this.properties.putAll(hdfsProperties.getOrigProps());
+        this.storageProperties = hdfsProperties;
         this.hdfsProperties = hdfsProperties;
     }
 
     public DFSFileSystem(HDFSProperties hdfsProperties, 
StorageBackend.StorageType storageType) {
         super(storageType.name(), storageType);
-        this.properties.putAll(properties);
+        this.properties.putAll(hdfsProperties.getOrigProps());
         this.hdfsProperties = hdfsProperties;
     }
 
@@ -222,7 +223,7 @@ public class DFSFileSystem extends RemoteFileSystem {
      * @throws IOException when read data error.
      */
     private static ByteBuffer readStreamBuffer(FSDataInputStream 
fsDataInputStream, long readOffset, long length)
-                throws IOException {
+            throws IOException {
         synchronized (fsDataInputStream) {
             long currentStreamOffset;
             try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
index 37984981486..5c4d9583296 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
@@ -18,15 +18,12 @@
 package org.apache.doris.tablefunction;
 
 import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.StorageBackend;
-import org.apache.doris.analysis.StorageBackend.StorageType;
-import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.URI;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.thrift.TFileType;
 
-import com.google.common.base.Strings;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -47,29 +44,16 @@ public class HdfsTableValuedFunction extends 
ExternalFileTableValuedFunction {
 
     private void init(Map<String, String> properties) throws AnalysisException 
{
         // 1. analyze common properties
-        Map<String, String> otherProps = 
super.parseCommonProperties(properties);
+        Map<String, String> props = super.parseCommonProperties(properties);
 
+        this.storageProperties = 
StorageProperties.createStorageProperties(props);
+        
locationProperties.putAll(storageProperties.getBackendConfigProperties());
         // 2. analyze uri
-        String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null);
-        if (Strings.isNullOrEmpty(uriStr)) {
-            throw new AnalysisException(String.format("Properties '%s' is 
required.", PROP_URI));
-        }
-        URI uri = URI.create(uriStr);
-        //fixme refactor in HDFSStorageProperties
-        StorageBackend.checkUri(uri, StorageType.HDFS);
-        filePath = uri.getScheme() + "://" + uri.getAuthority() + 
uri.getPath();
-
-        // 3. analyze other properties
-        for (String key : otherProps.keySet()) {
-            if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) {
-                locationProperties.put(HdfsResource.HADOOP_FS_NAME, 
otherProps.get(key));
-            } else {
-                locationProperties.put(key, otherProps.get(key));
-            }
-        }
-        // If the user does not specify the HADOOP_FS_NAME, we will use the 
uri's scheme and authority
-        if (!locationProperties.containsKey(HdfsResource.HADOOP_FS_NAME)) {
-            locationProperties.put(HdfsResource.HADOOP_FS_NAME, 
uri.getScheme() + "://" + uri.getAuthority());
+        try {
+            String    uri = 
storageProperties.checkLoadPropsAndReturnUri(props);
+            filePath = storageProperties.convertUrlToFilePath(uri);
+        } catch (UserException e) {
+            throw new AnalysisException("Failed check storage props", e);
         }
 
         if (!FeConstants.runningUnitTest) {
@@ -92,7 +76,7 @@ public class HdfsTableValuedFunction extends 
ExternalFileTableValuedFunction {
 
     @Override
     public BrokerDesc getBrokerDesc() {
-        return new BrokerDesc("HdfsTvfBroker", StorageType.HDFS, 
locationProperties);
+        return new BrokerDesc("HdfsTvfBroker", locationProperties, 
storageProperties);
     }
 
     // =========== implement abstract methods of TableValuedFunctionIf 
=================
diff --git 
a/regression-test/suites/refactor_storage_param_p0/hdfs_all_test.groovy 
b/regression-test/suites/refactor_storage_param_p0/hdfs_all_test.groovy
new file mode 100644
index 00000000000..bddefc51d1d
--- /dev/null
+++ b/regression-test/suites/refactor_storage_param_p0/hdfs_all_test.groovy
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.awaitility.Awaitility;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static groovy.test.GroovyAssert.shouldFail
+
+suite("refactor_params_hdfs_all_test", 
"p0,external,kerberos,external_docker,external_docker_kerberos") {
+    String enabled = 
context.config.otherConfigs.get("enableRefactorParamsHdfsTest")
+    if (enabled == null || enabled.equalsIgnoreCase("false")) {
+        return
+    }
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def table = "hdfs_all_test";
+
+    def databaseQueryResult = sql """
+       select database();
+    """
+    println databaseQueryResult
+    def currentDBName = databaseQueryResult.get(0).get(0)
+    println currentDBName
+    // cos
+
+    def createDBAndTbl = { String dbName ->
+
+        sql """
+                drop database if exists ${dbName}
+            """
+
+        sql """
+            create database ${dbName}
+        """
+
+        sql """
+             use ${dbName}
+             """
+        sql """
+        CREATE TABLE ${table}(
+            user_id            BIGINT       NOT NULL COMMENT "user id",
+            name               VARCHAR(20)           COMMENT "name",
+            age                INT                   COMMENT "age"
+        )
+        DUPLICATE KEY(user_id)
+        DISTRIBUTED BY HASH(user_id) BUCKETS 10
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+    """
+        sql """
+        insert into ${table} values (1, 'a', 10);
+    """
+
+        def insertResult = sql """
+        SELECT count(1) FROM ${table}
+    """
+
+        println "insertResult: ${insertResult}"
+
+        assert insertResult.get(0).get(0) == 1
+    }
+
+    def hdfsNonXmlParams = "\"fs.defaultFS\" = 
\"hdfs://${externalEnvIp}:8520\",\n" +
+            "\"hadoop.kerberos.min.seconds.before.relogin\" = \"5\",\n" +
+            "\"hadoop.security.authentication\" = \"kerberos\",\n" +
+            
"\"hadoop.kerberos.principal\"=\"hive/presto-master.docker.clus...@labs.teradata.com\",\n"
 +
+            "\"hadoop.kerberos.keytab\" = 
\"/mnt/disk1/gq/keytabs/keytabs/hive-presto-master.keytab\",\n" +
+            "\"hive.metastore.sasl.enabled \" = \"true\",\n" +
+            "\"hadoop.security.auth_to_local\" = 
\"RULE:[2:\\\$1@\\\$0](.*@LABS.TERADATA.COM)s/@.*//\n" +
+            "                                   
RULE:[2:\\\$1@\\\$0](.*@OTHERLABS.TERADATA.COM)s/@.*//\n" +
+            "                                   
RULE:[2:\\\$1@\\\$0](.*@OTHERREALM.COM)s/@.*//\n" +
+            "                                   DEFAULT\""
+
+    def createRepository = { String repoName, String location, String 
hdfsParams ->
+        try {
+            sql """
+                drop repository  ${repoName};
+            """
+        } catch (Exception e) {
+            // ignore exception, repo may not exist
+        }
+
+        sql """
+            CREATE REPOSITORY  ${repoName}
+            WITH HDFS
+            ON LOCATION "${location}"
+            PROPERTIES (
+                ${hdfsParams}
+            );
+        """
+    }
+
+    def backupAndRestore = { String repoName, String dbName, String tableName, 
String backupLabel ->
+        sql """
+        BACKUP SNAPSHOT ${dbName}.${backupLabel}
+        TO ${repoName}
+        ON (${tableName})
+    """
+        Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until(
+                {
+                    def backupResult = sql """
+                show backup from ${dbName} where SnapshotName = 
'${backupLabel}';
+            """
+                    println "backupResult: ${backupResult}"
+                    return backupResult.get(0).get(3) == "FINISHED"
+                })
+
+        def querySnapshotResult = sql """
+        SHOW SNAPSHOT ON ${repoName} WHERE SNAPSHOT =  '${backupLabel}';
+        """
+        println querySnapshotResult
+        def snapshotTimes = querySnapshotResult.get(0).get(1).split('\n')
+        def snapshotTime = snapshotTimes[0]
+
+        sql """
+        drop table  if exists ${tableName}; 
+        """
+
+        sql """
+        RESTORE SNAPSHOT ${dbName}.${backupLabel}
+        FROM ${repoName}
+        ON (`${tableName}`)
+        PROPERTIES
+        (
+            "backup_timestamp"="${snapshotTime}",
+            "replication_num" = "1"
+        );
+        """
+        Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until(
+                {
+                    try {
+
+                        sql """
+                        use ${dbName}
+                        """
+                        def restoreResult = sql """
+                         SELECT count(1) FROM ${tableName}
+                        """
+                        println "restoreResult: ${restoreResult}"
+                        def count = restoreResult.get(0).get(0)
+                        println "count: ${count}"
+                        return restoreResult.get(0).get(0) == 1
+                    } catch (Exception e) {
+                        // tbl not found
+                        println "tbl not found" + e.getMessage()
+                        return false
+                    }
+                })
+    }
+    def hdfs_tvf = { filePath, hdfsParam ->
+
+        def hdfs_tvf_sql = sql """
+        select * from hdfs
+
+        (
+          'uri'='${filePath}',
+          "format" = "csv",
+           ${hdfsParam}
+        );
+       """
+    }
+    def export_hdfs = { defaultFs, hdfsParams ->
+        def exportPath = defaultFs + "/test/_export/" + 
System.currentTimeMillis()
+        def exportLabel = "export_" + System.currentTimeMillis();
+        sql """
+                EXPORT TABLE ${table}
+                TO "${exportPath}"
+                PROPERTIES
+                (
+                "label"="${exportLabel}",
+                "line_delimiter" = ","
+                )
+                with HDFS
+                (
+              
+                 ${hdfsParams}
+                );
+              """
+
+        databaseQueryResult = sql """
+            select database();
+         """
+        currentDBName = databaseQueryResult.get(0).get(0)
+        Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until({
+            def exportResult = sql """
+                 SHOW EXPORT FROM ${currentDBName} WHERE LABEL = 
"${exportLabel}";
+                
+                """
+
+            println exportResult
+
+            if (null == exportResult || exportResult.isEmpty() || null == 
exportResult.get(0) || exportResult.get(0).size() < 3) {
+                return false;
+            }
+            if (exportResult.get(0).get(2) == 'CANCELLED' || 
exportResult.get(0).get(2) == 'FAILED') {
+                throw new RuntimeException("load failed")
+            }
+
+            return exportResult.get(0).get(2) == 'FINISHED'
+        })
+
+    }
+    def outfile_to_hdfs = { defaultFs, hdfsParams ->
+        def outFilePath = "${defaultFs}/outfile_different_hdfs/exp_"
+        // select ... into outfile ...
+        def res = sql """
+            SELECT * FROM ${table}  ORDER BY user_id
+            INTO OUTFILE "${outFilePath}"
+            FORMAT AS CSV
+            PROPERTIES (
+                ${hdfsParams}
+            );
+        """
+        return res[0][3]
+    }
+    def hdfsLoad = { filePath, hdfsParams ->
+        databaseQueryResult = sql """
+       select database();
+    """
+        println databaseQueryResult
+        def dataCountResult = sql """
+            SELECT count(*) FROM ${table}
+        """
+        def dataCount = dataCountResult[0][0]
+        def label = "hdfs_load_label_" + System.currentTimeMillis()
+        def load = sql """
+            LOAD LABEL `${label}` (
+           data infile ("${filePath}")
+           into table ${table}
+            COLUMNS TERMINATED BY "\\\t"
+            FORMAT AS "CSV"
+             (
+                user_id,
+                name,
+                age
+             ))
+             with hdfs
+             (
+              ${hdfsParams}
+             )
+             PROPERTIES
+            (
+                "timeout" = "3600"
+            );
+        """
+        Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until({
+            def loadResult = sql """
+           show load where label = '${label}';
+           """
+            println 'test'
+            println loadResult
+
+            if (null == loadResult || loadResult.isEmpty() || null == 
loadResult.get(0) || loadResult.get(0).size() < 3) {
+                return false;
+            }
+            if (loadResult.get(0).get(2) == 'CANCELLED' || 
loadResult.get(0).get(2) == 'FAILED') {
+                throw new RuntimeException("load failed")
+            }
+
+            return loadResult.get(0).get(2) == 'FINISHED'
+        })
+
+
+        def expectedCount = dataCount + 1
+        Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until({
+            def loadResult = sql """
+            select count(*) from ${table}
+        """
+            println "loadResult: ${loadResult} "
+            return loadResult.get(0).get(0) == expectedCount
+        })
+
+    }
+    def defaultFs = 'hdfs://172.20.32.136:8520'
+    def repoName = 'hdfs_non_xml_repo';
+    // create repo
+    createRepository(repoName,"${defaultFs}/test_repo",hdfsNonXmlParams);
+    def dbName1 = currentDBName + "${repoName}_1"
+    createDBAndTbl(dbName1)
+    def backupLabel=repoName+System.currentTimeMillis()
+    //backup and restore
+    backupAndRestore(repoName,dbName1,table,backupLabel)
+    def failedRepoName='failedRepo'
+    shouldFail {
+        
createRepository(failedRepoName,"s3://172.20.32.136:8520",hdfsNonXmlParams);
+    }
+    shouldFail {
+        createRepository(failedRepoName," ",hdfsNonXmlParams);
+    }
+
+    //outfile 
+    dbName1 = currentDBName + 'outfile_test_1'
+    createDBAndTbl(dbName1)
+    def outfile = outfile_to_hdfs(defaultFs, hdfsNonXmlParams);
+    println outfile
+    //hdfs tvf
+    def hdfsTvfResult = hdfs_tvf(outfile, hdfsNonXmlParams)
+    println hdfsTvfResult
+
+    //hdfsLoad(outfile,hdfsNonXmlParams)
+
+    //export 
+    export_hdfs(defaultFs, hdfsNonXmlParams)
+
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to