This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 6f534922432 branch-4.0: [improvement](tvf load) Add fast fail in Tvf 
load #56273 (#56580)
6f534922432 is described below

commit 6f534922432b9490da8a60dc78309bb1c28585f8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Sep 29 09:35:47 2025 +0800

    branch-4.0: [improvement](tvf load) Add fast fail in Tvf load #56273 
(#56580)
    
    Cherry-picked from #56273
    
    Co-authored-by: Refrain <[email protected]>
---
 .../java/org/apache/doris/common/util/S3Util.java  |  59 ++++++
 .../nereids/trees/plans/commands/LoadCommand.java  |  90 +--------
 .../ExternalFileTableValuedFunction.java           |   7 +
 .../suites/load_p0/tvf/test_tvf_fast_fail.groovy   | 217 +++++++++++++++++++++
 4 files changed, 289 insertions(+), 84 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index c147fce6801..85d5f4d0347 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -17,9 +17,15 @@
 
 package org.apache.doris.common.util;
 
+import org.apache.doris.cloud.security.SecurityChecker;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.credentials.CloudCredential;
 
 import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -43,10 +49,17 @@ import software.amazon.awssdk.services.s3.S3Configuration;
 import software.amazon.awssdk.services.sts.StsClient;
 import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
 
+import java.net.HttpURLConnection;
 import java.net.URI;
+import java.net.URL;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 public class S3Util {
+    private static final Logger LOG = LogManager.getLogger(Util.class);
+
     private static AwsCredentialsProvider 
getAwsCredencialsProvider(CloudCredential credential) {
         AwsCredentials awsCredential;
         AwsCredentialsProvider awsCredentialsProvider;
@@ -237,4 +250,50 @@ public class S3Util {
 
         return globPattern.substring(0, earliestSpecialCharIndex);
     }
+
+    // Fast fail validation for S3 endpoint connectivity to avoid retries and 
long waits
+    // when network conditions are poor. Validates endpoint format, whitelist, 
security,
+    // and tests connection with 10s timeout.
+    public static void validateAndTestEndpoint(String endpoint) throws 
UserException {
+        HttpURLConnection connection = null;
+        try {
+            String urlStr = endpoint;
+            // Add default protocol if not specified
+            if (!endpoint.startsWith("http://";) && 
!endpoint.startsWith("https://";)) {
+                urlStr = "http://"; + endpoint;
+            }
+            endpoint = endpoint.replaceFirst("^http://";, "");
+            endpoint = endpoint.replaceFirst("^https://";, "");
+            List<String> whiteList = new 
ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
+            whiteList.removeIf(String::isEmpty);
+            if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
+                throw new UserException("endpoint: " + endpoint
+                    + " is not in s3 load endpoint white list: " + 
String.join(",", whiteList));
+            }
+            SecurityChecker.getInstance().startSSRFChecking(urlStr);
+            URL url = new URL(urlStr);
+            connection = (HttpURLConnection) url.openConnection();
+            connection.setConnectTimeout(10000);
+            connection.connect();
+        } catch (Exception e) {
+            String msg;
+            if (e instanceof UserException) {
+                msg = ((UserException) e).getDetailMessage();
+            } else {
+                LOG.warn("Failed to connect endpoint={}, err={}", endpoint, e);
+                msg = e.getMessage();
+            }
+            throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
+                "Failed to access object storage, message=" + msg, e);
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.disconnect();
+                } catch (Exception e) {
+                    LOG.warn("Failed to disconnect connection, endpoint={}, 
err={}", endpoint, e);
+                }
+            }
+            SecurityChecker.getInstance().stopSSRFChecking();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index 6905191a0f6..9318e24e913 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -26,15 +26,13 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.cloud.security.SecurityChecker;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeNameFormat;
-import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.S3Util;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
 import org.apache.doris.load.EtlJobType;
@@ -57,10 +55,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -451,7 +445,11 @@ public class LoadCommand extends Command implements 
NeedAuditEncryption, Forward
             }
         } else if (brokerDesc != null) {
             etlJobType = EtlJobType.BROKER;
-            checkS3Param();
+            if (brokerDesc.getFileType() != null && 
brokerDesc.getFileType().equals(TFileType.FILE_S3)) {
+                ObjectStorageProperties storageProperties = 
(ObjectStorageProperties) brokerDesc.getStorageProperties();
+                String endpoint = storageProperties.getEndpoint();
+                S3Util.validateAndTestEndpoint(endpoint);
+            }
         } else {
             etlJobType = EtlJobType.UNKNOWN;
         }
@@ -470,82 +468,6 @@ public class LoadCommand extends Command implements 
NeedAuditEncryption, Forward
         handleLoadCommand(ctx, executor);
     }
 
-    /**
-     * check for s3 param
-     */
-    public void checkS3Param() throws UserException {
-        if (brokerDesc.getFileType() != null && 
brokerDesc.getFileType().equals(TFileType.FILE_S3)) {
-            ObjectStorageProperties storageProperties = 
(ObjectStorageProperties) brokerDesc.getStorageProperties();
-            String endpoint = storageProperties.getEndpoint();
-            checkEndpoint(endpoint);
-            checkWhiteList(endpoint);
-            List<String> filePaths = new ArrayList<>();
-            if (dataDescriptions != null && !dataDescriptions.isEmpty()) {
-                for (NereidsDataDescription dataDescription : 
dataDescriptions) {
-                    if (dataDescription.getFilePaths() != null) {
-                        for (String filePath : dataDescription.getFilePaths()) 
{
-                            if (filePath != null && !filePath.isEmpty()) {
-                                filePaths.add(filePath);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * check endpoint
-     */
-    private void checkEndpoint(String endpoint) throws UserException {
-        HttpURLConnection connection = null;
-        try {
-            String urlStr = endpoint;
-            // Add default protocol if not specified
-            if (!endpoint.startsWith("http://";) && 
!endpoint.startsWith("https://";)) {
-                urlStr = "http://"; + endpoint;
-            }
-            SecurityChecker.getInstance().startSSRFChecking(urlStr);
-            URL url = new URL(urlStr);
-            connection = (HttpURLConnection) url.openConnection();
-            connection.setConnectTimeout(10000);
-            connection.connect();
-        } catch (Exception e) {
-            LOG.warn("Failed to connect endpoint={}, err={}", endpoint, e);
-            String msg;
-            if (e instanceof UserException) {
-                msg = ((UserException) e).getDetailMessage();
-            } else {
-                msg = e.getMessage();
-            }
-            throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
-                "Failed to access object storage, message=" + msg, e);
-        } finally {
-            if (connection != null) {
-                try {
-                    connection.disconnect();
-                } catch (Exception e) {
-                    LOG.warn("Failed to disconnect connection, endpoint={}, 
err={}", endpoint, e);
-                }
-            }
-            SecurityChecker.getInstance().stopSSRFChecking();
-        }
-    }
-
-    /**
-     * check WhiteList
-     */
-    public void checkWhiteList(String endpoint) throws UserException {
-        endpoint = endpoint.replaceFirst("^http://";, "");
-        endpoint = endpoint.replaceFirst("^https://";, "");
-        List<String> whiteList = new 
ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
-        whiteList.removeIf(String::isEmpty);
-        if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
-            throw new UserException("endpoint: " + endpoint
-                + " is not in s3 load endpoint white list: " + 
String.join(",", whiteList));
-        }
-    }
-
     /**
      * this method is from StmtExecutor.handleLoadStmt()
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 3961ce5de44..2c4fb0651e6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -40,10 +40,12 @@ import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.common.util.FileFormatUtils;
 import org.apache.doris.common.util.NetUtils;
+import org.apache.doris.common.util.S3Util;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
 import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
 import 
org.apache.doris.datasource.property.fileformat.TextFileFormatProperties;
+import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.datasource.tvf.source.TVFScanNode;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -151,6 +153,11 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         String path = getFilePath();
         BrokerDesc brokerDesc = getBrokerDesc();
         try {
+            if (brokerDesc.getFileType() != null && 
brokerDesc.getFileType().equals(TFileType.FILE_S3)) {
+                ObjectStorageProperties storageProperties = 
(ObjectStorageProperties) brokerDesc.getStorageProperties();
+                String endpoint = storageProperties.getEndpoint();
+                S3Util.validateAndTestEndpoint(endpoint);
+            }
             BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
         } catch (UserException e) {
             throw new AnalysisException("parse file failed, err: " + 
e.getMessage(), e);
diff --git a/regression-test/suites/load_p0/tvf/test_tvf_fast_fail.groovy 
b/regression-test/suites/load_p0/tvf/test_tvf_fast_fail.groovy
new file mode 100644
index 00000000000..d5f69b81276
--- /dev/null
+++ b/regression-test/suites/load_p0/tvf/test_tvf_fast_fail.groovy
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_tvf_fast_fail", "p0,external,tvf") {
+    String enabled = context.config.otherConfigs.get("enableS3Test")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable S3 test.")
+        return;
+    }
+
+    def tableName = "tvf_fast_fail_test"
+    
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE ${tableName} (
+            r_reason_sk bigint,
+            r_reason_id char(16),
+            r_reason_desc char(100)
+        )
+        DUPLICATE KEY(r_reason_sk)
+        DISTRIBUTED BY HASH(r_reason_sk) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+    """
+
+    // Test 1: Invalid endpoint should fail fast (within 15 seconds)
+    def testInvalidEndpoint = {
+        def startTime = System.currentTimeMillis()
+        def errorOccurred = false
+        def errorMessage = ""
+        
+        try {
+            sql """
+                INSERT INTO ${tableName}
+                SELECT * FROM S3(
+                    'uri' = 's3://test-bucket/test.csv',
+                    'access_key' = 'test_access_key',
+                    'secret_key' = 'test_secret_key',
+                    's3.endpoint' = 'invalid-endpoint.com',
+                    'region' = 'us-east-1',
+                    'format' = 'csv',
+                    'column_separator' = '|'
+                );
+            """
+        } catch (Exception e) {
+            errorOccurred = true
+            errorMessage = e.getMessage()
+            logger.info("Expected error occurred: ${errorMessage}")
+        }
+        
+        def endTime = System.currentTimeMillis()
+        def duration = (endTime - startTime) / 1000.0
+        
+        logger.info("Test invalid endpoint - Duration: ${duration} seconds, 
Error: ${errorOccurred}")
+        
+        // Assertions
+        assertTrue(errorOccurred, "Expected error for invalid endpoint")
+        assertTrue(duration < 15.0, "Fast fail should complete within 15 
seconds, but took ${duration} seconds")
+        assertTrue(errorMessage.contains("Failed to access object storage") || 
+                  errorMessage.contains("invalid-endpoint.com"), 
+                  "Error message should indicate connection failure: 
${errorMessage}")
+    }
+
+    // Test 2: Non-existent endpoint should fail fast
+    def testNonExistentEndpoint = {
+        def startTime = System.currentTimeMillis()
+        def errorOccurred = false
+        def errorMessage = ""
+        
+        try {
+            sql """
+                INSERT INTO ${tableName}
+                SELECT * FROM S3(
+                    'uri' = 's3://test-bucket/test.csv',
+                    'access_key' = 'test_access_key',
+                    'secret_key' = 'test_secret_key',
+                    's3.endpoint' = 'non-existent-s3-endpoint-12345.com',
+                    'region' = 'us-east-1',
+                    'format' = 'csv',
+                    'column_separator' = '|'
+                );
+            """
+        } catch (Exception e) {
+            errorOccurred = true
+            errorMessage = e.getMessage()
+            logger.info("Expected error occurred: ${errorMessage}")
+        }
+        
+        def endTime = System.currentTimeMillis()
+        def duration = (endTime - startTime) / 1000.0
+        
+        logger.info("Test non-existent endpoint - Duration: ${duration} 
seconds, Error: ${errorOccurred}")
+        
+        // Assertions
+        assertTrue(errorOccurred, "Expected error for non-existent endpoint")
+        assertTrue(duration < 15.0, "Fast fail should complete within 15 
seconds, but took ${duration} seconds")
+        assertTrue(errorMessage.contains("Failed to access object storage") || 
+                  errorMessage.contains("non-existent-s3-endpoint-12345.com"), 
+                  "Error message should indicate connection failure: 
${errorMessage}")
+    }
+
+    // Test 3: Malformed endpoint should fail fast
+    def testMalformedEndpoint = {
+        def startTime = System.currentTimeMillis()
+        def errorOccurred = false
+        def errorMessage = ""
+        
+        try {
+            sql """
+                INSERT INTO ${tableName}
+                SELECT * FROM S3(
+                    'uri' = 's3://test-bucket/test.csv',
+                    'access_key' = 'test_access_key',
+                    'secret_key' = 'test_secret_key',
+                    's3.endpoint' = 'malformed-url-without-domain',
+                    'region' = 'us-east-1',
+                    'format' = 'csv',
+                    'column_separator' = '|'
+                );
+            """
+        } catch (Exception e) {
+            errorOccurred = true
+            errorMessage = e.getMessage()
+            logger.info("Expected error occurred: ${errorMessage}")
+        }
+        
+        def endTime = System.currentTimeMillis()
+        def duration = (endTime - startTime) / 1000.0
+        
+        logger.info("Test malformed endpoint - Duration: ${duration} seconds, 
Error: ${errorOccurred}")
+        
+        // Assertions
+        assertTrue(errorOccurred, "Expected error for malformed endpoint")
+        assertTrue(duration < 15.0, "Fast fail should complete within 15 
seconds, but took ${duration} seconds")
+        assertTrue(errorMessage.contains("Failed to access object storage") || 
+                  errorMessage.contains("malformed-url-without-domain"), 
+                  "Error message should indicate connection failure: 
${errorMessage}")
+    }
+
+    // Test 4: Compare with valid endpoint (if available) - this should work 
or fail for other reasons
+    def testValidEndpointComparison = {
+        if (context.config.otherConfigs.get("s3Endpoint") != null) {
+            def validEndpoint = context.config.otherConfigs.get("s3Endpoint")
+            def startTime = System.currentTimeMillis()
+            def errorOccurred = false
+            def errorMessage = ""
+            
+            try {
+                sql """
+                    INSERT INTO ${tableName}
+                    SELECT * FROM S3(
+                        'uri' = 's3://non-existent-bucket-12345/test.csv',
+                        'access_key' = 'test_access_key',
+                        'secret_key' = 'test_secret_key',
+                        's3.endpoint' = '${validEndpoint}',
+                        'region' = 'us-east-1',
+                        'format' = 'csv',
+                        'column_separator' = '|'
+                    );
+                """
+            } catch (Exception e) {
+                errorOccurred = true
+                errorMessage = e.getMessage()
+                logger.info("Error with valid endpoint (expected due to 
invalid credentials): ${errorMessage}")
+            }
+            
+            def endTime = System.currentTimeMillis()
+            def duration = (endTime - startTime) / 1000.0
+            
+            logger.info("Test valid endpoint - Duration: ${duration} seconds, 
Error: ${errorOccurred}")
+            
+            // With valid endpoint, the error should be about 
credentials/bucket, not connection
+            if (errorOccurred) {
+                assertFalse(errorMessage.contains("Failed to access object 
storage") && 
+                           errorMessage.contains(validEndpoint), 
+                           "Valid endpoint should not have connection failure: 
${errorMessage}")
+            }
+        } else {
+            logger.info("Skipping valid endpoint test - no valid endpoint 
configured")
+        }
+    }
+
+    // Run all tests
+    logger.info("=== Starting TVF Fast Fail Tests ===")
+    
+    logger.info("Test 1: Invalid endpoint fast fail")
+    testInvalidEndpoint()
+    
+    logger.info("Test 2: Non-existent endpoint fast fail")  
+    testNonExistentEndpoint()
+    
+    logger.info("Test 3: Malformed endpoint fast fail")
+    testMalformedEndpoint()
+    
+    logger.info("Test 4: Valid endpoint comparison")
+    testValidEndpointComparison()
+    
+    logger.info("=== TVF Fast Fail Tests Completed Successfully ===")
+
+    // Cleanup
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to