This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 95cb544b3d8 [Enhancement](s3-load) Add domain connection and aksk 
correction check for S3 load (#36711)
95cb544b3d8 is described below

commit 95cb544b3d856f1beb07b51a5704f634840b6fa3
Author: Xin Liao <liaoxin...@126.com>
AuthorDate: Tue Jul 2 21:44:24 2024 +0800

    [Enhancement](s3-load) Add domain connection and aksk correction check for 
S3 load (#36711)
    
    Add domain connection and aksk correction check for S3 load before
    actual execution.
---
 .../java/org/apache/doris/analysis/LoadStmt.java   |  88 +++++++++--
 .../property/constants/S3Properties.java           |   1 +
 ...t_domain_connection_and_ak_sk_correction.groovy | 161 +++++++++++++++++++++
 3 files changed, 241 insertions(+), 9 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index d8d515fe6a4..1990078b46c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -21,10 +21,14 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB;
 import org.apache.doris.cloud.security.SecurityChecker;
+import org.apache.doris.cloud.storage.RemoteBase;
+import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.TimeUtils;
@@ -500,7 +504,7 @@ public class LoadStmt extends DdlStmt {
             }
         } else if (brokerDesc != null) {
             etlJobType = EtlJobType.BROKER;
-            checkWhiteList();
+            checkS3Param();
         } else if (isMysqlLoad) {
             etlJobType = EtlJobType.LOCAL_FILE;
         } else {
@@ -518,6 +522,26 @@ public class LoadStmt extends DdlStmt {
         user = ConnectContext.get().getQualifiedUser();
     }
 
+
+    private String getProviderFromEndpoint() {
+        Map<String, String> properties = brokerDesc.getProperties();
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (entry.getKey().equalsIgnoreCase(S3Properties.PROVIDER)) {
+                return entry.getValue();
+            }
+        }
+        return S3Properties.S3_PROVIDER;
+    }
+
+    private String getBucketFromFilePath(String filePath) throws Exception {
+        String[] parts = filePath.split("\\/\\/");
+        if (parts.length < 2) {
+            throw new Exception("filePath is not valid");
+        }
+        String buckt = parts[1].split("\\/")[0];
+        return buckt;
+    }
+
     public String getComment() {
         return comment;
     }
@@ -597,7 +621,7 @@ public class LoadStmt extends DdlStmt {
         }
     }
 
-    public void checkWhiteList() throws UserException {
+    public void checkS3Param() throws UserException {
         Map<String, String> brokerDescProperties = brokerDesc.getProperties();
         if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT)
                 && 
brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY)
@@ -606,17 +630,63 @@ public class LoadStmt extends DdlStmt {
             String endpoint = 
brokerDescProperties.get(S3Properties.Env.ENDPOINT);
             endpoint = endpoint.replaceFirst("^http://";, "");
             endpoint = endpoint.replaceFirst("^https://";, "");
-            List<String> whiteList = new 
ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
-            whiteList.removeIf(String::isEmpty);
-            if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
-                throw new UserException("endpoint: " + endpoint
-                    + " is not in s3 load endpoint white list: " + 
String.join(",", whiteList));
-            }
             brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint);
-            if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
+            checkWhiteList(endpoint);
+            if 
(AzureProperties.checkAzureProviderPropertyExist(brokerDescProperties)) {
                 return;
             }
             checkEndpoint(endpoint);
+            checkAkSk();
+        }
+    }
+
+    public void checkWhiteList(String endpoint) throws UserException {
+        List<String> whiteList = new 
ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
+        whiteList.removeIf(String::isEmpty);
+        if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
+            throw new UserException("endpoint: " + endpoint
+                    + " is not in s3 load endpoint white list: " + 
String.join(",", whiteList));
         }
     }
+
+    private void checkAkSk() throws UserException {
+        RemoteBase remote = null;
+        ObjectInfo objectInfo = null;
+        try {
+            Map<String, String> brokerDescProperties = 
brokerDesc.getProperties();
+            String provider = getProviderFromEndpoint();
+            for (DataDescription dataDescription : dataDescriptions) {
+                for (String filePath : dataDescription.getFilePaths()) {
+                    String bucket = getBucketFromFilePath(filePath);
+                    objectInfo = new 
ObjectInfo(ObjectStoreInfoPB.Provider.valueOf(provider.toUpperCase()),
+                            
brokerDescProperties.get(S3Properties.Env.ACCESS_KEY),
+                            
brokerDescProperties.get(S3Properties.Env.SECRET_KEY),
+                            bucket, 
brokerDescProperties.get(S3Properties.Env.ENDPOINT),
+                            brokerDescProperties.get(S3Properties.Env.REGION), 
"");
+                    remote = RemoteBase.newInstance(objectInfo);
+                    // RemoteBase#headObject does not throw exception if key 
does not exist.
+                    remote.headObject("1");
+                    remote.listObjects(null);
+                    remote.close();
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed check object info={}", objectInfo, e);
+            String message = e.getMessage();
+            if (message != null) {
+                int index = message.indexOf("Error message=");
+                if (index != -1) {
+                    message = message.substring(index);
+                }
+            }
+            throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
+                    "Incorrect object storage info, " + message);
+        } finally {
+            if (remote != null) {
+                remote.close();
+            }
+        }
+
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index d1b3b17e2da..a0ef74c7a96 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -58,6 +58,7 @@ public class S3Properties extends BaseProperties {
     public static final String MAX_CONNECTIONS = "s3.connection.maximum";
     public static final String REQUEST_TIMEOUT_MS = 
"s3.connection.request.timeout";
     public static final String CONNECTION_TIMEOUT_MS = "s3.connection.timeout";
+    public static final String S3_PROVIDER = "S3";
 
     // required by storage policy
     public static final String ROOT_PATH = "s3.root.path";
diff --git 
a/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
 
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
new file mode 100644
index 00000000000..889da246d3b
--- /dev/null
+++ 
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_domain_connection_and_ak_sk_correction",  "load_p0") {
+    // create table
+    def tableName = 'test_domain_connection_and_ak_sk_correction'
+    def tableNameOrders = 'test_domain_connection_and_ak_sk_correction_orders'
+    sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
+    sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE"""
+    sql """ 
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+        P_PARTKEY     INTEGER NOT NULL,
+        P_NAME        VARCHAR(55) NOT NULL,
+        P_MFGR        CHAR(25) NOT NULL,
+        P_BRAND       CHAR(10) NOT NULL,
+        P_TYPE        VARCHAR(25) NOT NULL,
+        P_SIZE        INTEGER NOT NULL,
+        P_CONTAINER   CHAR(10) NOT NULL,
+        P_RETAILPRICE DECIMAL(15,2) NOT NULL,
+        P_COMMENT     VARCHAR(23) NOT NULL 
+        )
+        DUPLICATE KEY(P_PARTKEY, P_NAME)
+        DISTRIBUTED BY HASH(P_PARTKEY) BUCKETS 3
+        PROPERTIES (
+              "replication_num" = "1"
+        );
+    """
+    sql """
+      CREATE TABLE IF NOT EXISTS ${tableNameOrders}  (
+        O_ORDERKEY       INTEGER NOT NULL,
+        O_CUSTKEY        INTEGER NOT NULL,
+        O_ORDERSTATUS    CHAR(1) NOT NULL,
+        O_TOTALPRICE     DECIMAL(15,2) NOT NULL,
+        O_ORDERDATE      DATE NOT NULL,
+        O_ORDERPRIORITY  CHAR(15) NOT NULL,  
+        O_CLERK          CHAR(15) NOT NULL, 
+        O_SHIPPRIORITY   INTEGER NOT NULL,
+        O_COMMENT        VARCHAR(79) NOT NULL
+        )
+        DUPLICATE KEY(O_ORDERKEY, O_CUSTKEY)
+        DISTRIBUTED BY HASH(O_ORDERKEY) BUCKETS 32
+        PROPERTIES (
+              "replication_num" = "1"
+        );
+    """
+
+
+    def label = UUID.randomUUID().toString().replace("-", "")
+    def result = sql """
+        LOAD LABEL ${label}
+        (
+            DATA 
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
+            INTO TABLE ${tableName}
+            COLUMNS TERMINATED BY "|"
+            (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, 
p_retailprice, p_comment, temp)
+        )
+        WITH S3
+        (
+            "AWS_ENDPOINT" = "${getS3Endpoint()}",
+            "AWS_ACCESS_KEY" = "${getS3AK()}",
+            "AWS_SECRET_KEY" = "${getS3SK()}",
+            "AWS_REGION" = "${getS3Region()}"
+        );
+    """
+    logger.info("the first sql result is {}", result)
+
+    label = UUID.randomUUID().toString().replace("-", "")
+    try {
+        result = sql """
+            LOAD LABEL ${label}
+            (
+                DATA 
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
+                INTO TABLE ${tableName}
+                COLUMNS TERMINATED BY "|"
+                (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment, temp)
+            )
+            WITH S3
+            (
+                "AWS_ENDPOINT" = "${getS3Endpoint()}1",
+                "AWS_ACCESS_KEY" = "${getS3AK()}",
+                "AWS_SECRET_KEY" = "${getS3SK()}",
+                "AWS_REGION" = "${getS3Region()}"
+            );
+        """
+        logger.info("the second sql result is {}", result)
+        assertTrue(false. "The endpoint is wrong, so the connection test 
should fale")
+    } catch (Exception e) {
+        logger.info("the second sql exception result is {}", e.getMessage())
+        assertTrue(e.getMessage().contains("Incorrect object storage info"), 
e.getMessage())
+    }
+
+    label = UUID.randomUUID().toString().replace("-", "")
+    try {
+        result = sql """
+            LOAD LABEL ${label}
+            (
+                DATA 
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
+                INTO TABLE ${tableName}
+                COLUMNS TERMINATED BY "|"
+                (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment, temp)
+            )
+            WITH S3
+            (
+                "AWS_ENDPOINT" = "${getS3Endpoint()}",
+                "AWS_ACCESS_KEY" = "${getS3AK()}1",
+                "AWS_SECRET_KEY" = "${getS3SK()}",
+                "AWS_REGION" = "${getS3Region()}"
+            );
+        """
+        logger.info("the third sql result is {}", result)
+        assertTrue(false. "AK is wrong, so the correction of AKSK test should 
fale")
+    } catch (Exception e) {
+        logger.info("the third sql exception result is {}", e.getMessage())
+        assertTrue(e.getMessage().contains("Incorrect object storage info"), 
e.getMessage())
+    }
+
+    label = UUID.randomUUID().toString().replace("-", "")
+    try {
+        result = sql """
+            LOAD LABEL ${label}
+            (
+                DATA 
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
+                INTO TABLE ${tableName}
+                COLUMNS TERMINATED BY "|"
+                (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment, temp),
+                DATA 
INFILE("s3://${getS3BucketName()}1/regression/tpch/sf1/orders.tbl.1", 
"s3://${getS3BucketName()}/regression/tpch/sf1/orders.tbl.2")
+                INTO TABLE ${tableNameOrders}
+                COLUMNS TERMINATED BY "|"
+                (o_orderkey, o_custkey, o_orderstatus, o_totalprice, 
o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, temp)
+            )
+            WITH S3
+            (
+                "AWS_ENDPOINT" = "${getS3Endpoint()}",
+                "AWS_ACCESS_KEY" = "${getS3AK()}",
+                "AWS_SECRET_KEY" = "${getS3SK()}",
+                "AWS_REGION" = "${getS3Region()}"
+            );
+        """
+        logger.info("the fourth sql result is {}", result)
+        assertTrue(false. "in the second DATA INFILE, the first bucket is 
wrong, so the sql should fail")
+    } catch (Exception e) {
+        logger.info("the fourth sql exception result is {}", e.getMessage())
+        assertTrue(e.getMessage().contains("Incorrect object storage info"), 
e.getMessage())
+    }
+    sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
+    sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE"""
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to