This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 6f534922432 branch-4.0: [improvement](tvf load) Add fast fail in Tvf
load #56273 (#56580)
6f534922432 is described below
commit 6f534922432b9490da8a60dc78309bb1c28585f8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Sep 29 09:35:47 2025 +0800
branch-4.0: [improvement](tvf load) Add fast fail in Tvf load #56273
(#56580)
Cherry-picked from #56273
Co-authored-by: Refrain <[email protected]>
---
.../java/org/apache/doris/common/util/S3Util.java | 59 ++++++
.../nereids/trees/plans/commands/LoadCommand.java | 90 +--------
.../ExternalFileTableValuedFunction.java | 7 +
.../suites/load_p0/tvf/test_tvf_fast_fail.groovy | 217 +++++++++++++++++++++
4 files changed, 289 insertions(+), 84 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index c147fce6801..85d5f4d0347 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -17,9 +17,15 @@
package org.apache.doris.common.util;
+import org.apache.doris.cloud.security.SecurityChecker;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.credentials.CloudCredential;
import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -43,10 +49,17 @@ import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.sts.StsClient;
import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import java.net.HttpURLConnection;
import java.net.URI;
+import java.net.URL;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
public class S3Util {
+ private static final Logger LOG = LogManager.getLogger(Util.class);
+
private static AwsCredentialsProvider
getAwsCredencialsProvider(CloudCredential credential) {
AwsCredentials awsCredential;
AwsCredentialsProvider awsCredentialsProvider;
@@ -237,4 +250,50 @@ public class S3Util {
return globPattern.substring(0, earliestSpecialCharIndex);
}
+
+ // Fast fail validation for S3 endpoint connectivity to avoid retries and
long waits
+ // when network conditions are poor. Validates endpoint format, whitelist,
security,
+ // and tests connection with 10s timeout.
+ public static void validateAndTestEndpoint(String endpoint) throws
UserException {
+ HttpURLConnection connection = null;
+ try {
+ String urlStr = endpoint;
+ // Add default protocol if not specified
+ if (!endpoint.startsWith("http://") &&
!endpoint.startsWith("https://")) {
+ urlStr = "http://" + endpoint;
+ }
+ endpoint = endpoint.replaceFirst("^http://", "");
+ endpoint = endpoint.replaceFirst("^https://", "");
+ List<String> whiteList = new
ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
+ whiteList.removeIf(String::isEmpty);
+ if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
+ throw new UserException("endpoint: " + endpoint
+ + " is not in s3 load endpoint white list: " +
String.join(",", whiteList));
+ }
+ SecurityChecker.getInstance().startSSRFChecking(urlStr);
+ URL url = new URL(urlStr);
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setConnectTimeout(10000);
+ connection.connect();
+ } catch (Exception e) {
+ String msg;
+ if (e instanceof UserException) {
+ msg = ((UserException) e).getDetailMessage();
+ } else {
+ LOG.warn("Failed to connect endpoint={}, err={}", endpoint, e);
+ msg = e.getMessage();
+ }
+ throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
+ "Failed to access object storage, message=" + msg, e);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.disconnect();
+ } catch (Exception e) {
+ LOG.warn("Failed to disconnect connection, endpoint={},
err={}", endpoint, e);
+ }
+ }
+ SecurityChecker.getInstance().stopSSRFChecking();
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index 6905191a0f6..9318e24e913 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -26,15 +26,13 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
-import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.S3Util;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
import org.apache.doris.load.EtlJobType;
@@ -57,10 +55,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -451,7 +445,11 @@ public class LoadCommand extends Command implements
NeedAuditEncryption, Forward
}
} else if (brokerDesc != null) {
etlJobType = EtlJobType.BROKER;
- checkS3Param();
+ if (brokerDesc.getFileType() != null &&
brokerDesc.getFileType().equals(TFileType.FILE_S3)) {
+ ObjectStorageProperties storageProperties =
(ObjectStorageProperties) brokerDesc.getStorageProperties();
+ String endpoint = storageProperties.getEndpoint();
+ S3Util.validateAndTestEndpoint(endpoint);
+ }
} else {
etlJobType = EtlJobType.UNKNOWN;
}
@@ -470,82 +468,6 @@ public class LoadCommand extends Command implements
NeedAuditEncryption, Forward
handleLoadCommand(ctx, executor);
}
- /**
- * check for s3 param
- */
- public void checkS3Param() throws UserException {
- if (brokerDesc.getFileType() != null &&
brokerDesc.getFileType().equals(TFileType.FILE_S3)) {
- ObjectStorageProperties storageProperties =
(ObjectStorageProperties) brokerDesc.getStorageProperties();
- String endpoint = storageProperties.getEndpoint();
- checkEndpoint(endpoint);
- checkWhiteList(endpoint);
- List<String> filePaths = new ArrayList<>();
- if (dataDescriptions != null && !dataDescriptions.isEmpty()) {
- for (NereidsDataDescription dataDescription :
dataDescriptions) {
- if (dataDescription.getFilePaths() != null) {
- for (String filePath : dataDescription.getFilePaths())
{
- if (filePath != null && !filePath.isEmpty()) {
- filePaths.add(filePath);
- }
- }
- }
- }
- }
- }
- }
-
- /**
- * check endpoint
- */
- private void checkEndpoint(String endpoint) throws UserException {
- HttpURLConnection connection = null;
- try {
- String urlStr = endpoint;
- // Add default protocol if not specified
- if (!endpoint.startsWith("http://") &&
!endpoint.startsWith("https://")) {
- urlStr = "http://" + endpoint;
- }
- SecurityChecker.getInstance().startSSRFChecking(urlStr);
- URL url = new URL(urlStr);
- connection = (HttpURLConnection) url.openConnection();
- connection.setConnectTimeout(10000);
- connection.connect();
- } catch (Exception e) {
- LOG.warn("Failed to connect endpoint={}, err={}", endpoint, e);
- String msg;
- if (e instanceof UserException) {
- msg = ((UserException) e).getDetailMessage();
- } else {
- msg = e.getMessage();
- }
- throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
- "Failed to access object storage, message=" + msg, e);
- } finally {
- if (connection != null) {
- try {
- connection.disconnect();
- } catch (Exception e) {
- LOG.warn("Failed to disconnect connection, endpoint={},
err={}", endpoint, e);
- }
- }
- SecurityChecker.getInstance().stopSSRFChecking();
- }
- }
-
- /**
- * check WhiteList
- */
- public void checkWhiteList(String endpoint) throws UserException {
- endpoint = endpoint.replaceFirst("^http://", "");
- endpoint = endpoint.replaceFirst("^https://", "");
- List<String> whiteList = new
ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
- whiteList.removeIf(String::isEmpty);
- if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
- throw new UserException("endpoint: " + endpoint
- + " is not in s3 load endpoint white list: " +
String.join(",", whiteList));
- }
- }
-
/**
* this method is from StmtExecutor.handleLoadStmt()
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 3961ce5de44..2c4fb0651e6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -40,10 +40,12 @@ import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.NetUtils;
+import org.apache.doris.common.util.S3Util;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import
org.apache.doris.datasource.property.fileformat.TextFileFormatProperties;
+import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.datasource.tvf.source.TVFScanNode;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -151,6 +153,11 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
String path = getFilePath();
BrokerDesc brokerDesc = getBrokerDesc();
try {
+ if (brokerDesc.getFileType() != null &&
brokerDesc.getFileType().equals(TFileType.FILE_S3)) {
+ ObjectStorageProperties storageProperties =
(ObjectStorageProperties) brokerDesc.getStorageProperties();
+ String endpoint = storageProperties.getEndpoint();
+ S3Util.validateAndTestEndpoint(endpoint);
+ }
BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
} catch (UserException e) {
throw new AnalysisException("parse file failed, err: " +
e.getMessage(), e);
diff --git a/regression-test/suites/load_p0/tvf/test_tvf_fast_fail.groovy
b/regression-test/suites/load_p0/tvf/test_tvf_fast_fail.groovy
new file mode 100644
index 00000000000..d5f69b81276
--- /dev/null
+++ b/regression-test/suites/load_p0/tvf/test_tvf_fast_fail.groovy
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_tvf_fast_fail", "p0,external,tvf") {
+ String enabled = context.config.otherConfigs.get("enableS3Test")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable S3 test.")
+ return;
+ }
+
+ def tableName = "tvf_fast_fail_test"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ r_reason_sk bigint,
+ r_reason_id char(16),
+ r_reason_desc char(100)
+ )
+ DUPLICATE KEY(r_reason_sk)
+ DISTRIBUTED BY HASH(r_reason_sk) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ // Test 1: Invalid endpoint should fail fast (within 15 seconds)
+ def testInvalidEndpoint = {
+ def startTime = System.currentTimeMillis()
+ def errorOccurred = false
+ def errorMessage = ""
+
+ try {
+ sql """
+ INSERT INTO ${tableName}
+ SELECT * FROM S3(
+ 'uri' = 's3://test-bucket/test.csv',
+ 'access_key' = 'test_access_key',
+ 'secret_key' = 'test_secret_key',
+ 's3.endpoint' = 'invalid-endpoint.com',
+ 'region' = 'us-east-1',
+ 'format' = 'csv',
+ 'column_separator' = '|'
+ );
+ """
+ } catch (Exception e) {
+ errorOccurred = true
+ errorMessage = e.getMessage()
+ logger.info("Expected error occurred: ${errorMessage}")
+ }
+
+ def endTime = System.currentTimeMillis()
+ def duration = (endTime - startTime) / 1000.0
+
+ logger.info("Test invalid endpoint - Duration: ${duration} seconds,
Error: ${errorOccurred}")
+
+ // Assertions
+ assertTrue(errorOccurred, "Expected error for invalid endpoint")
+ assertTrue(duration < 15.0, "Fast fail should complete within 15
seconds, but took ${duration} seconds")
+ assertTrue(errorMessage.contains("Failed to access object storage") ||
+ errorMessage.contains("invalid-endpoint.com"),
+ "Error message should indicate connection failure:
${errorMessage}")
+ }
+
+ // Test 2: Non-existent endpoint should fail fast
+ def testNonExistentEndpoint = {
+ def startTime = System.currentTimeMillis()
+ def errorOccurred = false
+ def errorMessage = ""
+
+ try {
+ sql """
+ INSERT INTO ${tableName}
+ SELECT * FROM S3(
+ 'uri' = 's3://test-bucket/test.csv',
+ 'access_key' = 'test_access_key',
+ 'secret_key' = 'test_secret_key',
+ 's3.endpoint' = 'non-existent-s3-endpoint-12345.com',
+ 'region' = 'us-east-1',
+ 'format' = 'csv',
+ 'column_separator' = '|'
+ );
+ """
+ } catch (Exception e) {
+ errorOccurred = true
+ errorMessage = e.getMessage()
+ logger.info("Expected error occurred: ${errorMessage}")
+ }
+
+ def endTime = System.currentTimeMillis()
+ def duration = (endTime - startTime) / 1000.0
+
+ logger.info("Test non-existent endpoint - Duration: ${duration}
seconds, Error: ${errorOccurred}")
+
+ // Assertions
+ assertTrue(errorOccurred, "Expected error for non-existent endpoint")
+ assertTrue(duration < 15.0, "Fast fail should complete within 15
seconds, but took ${duration} seconds")
+ assertTrue(errorMessage.contains("Failed to access object storage") ||
+ errorMessage.contains("non-existent-s3-endpoint-12345.com"),
+ "Error message should indicate connection failure:
${errorMessage}")
+ }
+
+ // Test 3: Malformed endpoint should fail fast
+ def testMalformedEndpoint = {
+ def startTime = System.currentTimeMillis()
+ def errorOccurred = false
+ def errorMessage = ""
+
+ try {
+ sql """
+ INSERT INTO ${tableName}
+ SELECT * FROM S3(
+ 'uri' = 's3://test-bucket/test.csv',
+ 'access_key' = 'test_access_key',
+ 'secret_key' = 'test_secret_key',
+ 's3.endpoint' = 'malformed-url-without-domain',
+ 'region' = 'us-east-1',
+ 'format' = 'csv',
+ 'column_separator' = '|'
+ );
+ """
+ } catch (Exception e) {
+ errorOccurred = true
+ errorMessage = e.getMessage()
+ logger.info("Expected error occurred: ${errorMessage}")
+ }
+
+ def endTime = System.currentTimeMillis()
+ def duration = (endTime - startTime) / 1000.0
+
+ logger.info("Test malformed endpoint - Duration: ${duration} seconds,
Error: ${errorOccurred}")
+
+ // Assertions
+ assertTrue(errorOccurred, "Expected error for malformed endpoint")
+ assertTrue(duration < 15.0, "Fast fail should complete within 15
seconds, but took ${duration} seconds")
+ assertTrue(errorMessage.contains("Failed to access object storage") ||
+ errorMessage.contains("malformed-url-without-domain"),
+ "Error message should indicate connection failure:
${errorMessage}")
+ }
+
+ // Test 4: Compare with valid endpoint (if available) - this should work
or fail for other reasons
+ def testValidEndpointComparison = {
+ if (context.config.otherConfigs.get("s3Endpoint") != null) {
+ def validEndpoint = context.config.otherConfigs.get("s3Endpoint")
+ def startTime = System.currentTimeMillis()
+ def errorOccurred = false
+ def errorMessage = ""
+
+ try {
+ sql """
+ INSERT INTO ${tableName}
+ SELECT * FROM S3(
+ 'uri' = 's3://non-existent-bucket-12345/test.csv',
+ 'access_key' = 'test_access_key',
+ 'secret_key' = 'test_secret_key',
+ 's3.endpoint' = '${validEndpoint}',
+ 'region' = 'us-east-1',
+ 'format' = 'csv',
+ 'column_separator' = '|'
+ );
+ """
+ } catch (Exception e) {
+ errorOccurred = true
+ errorMessage = e.getMessage()
+ logger.info("Error with valid endpoint (expected due to
invalid credentials): ${errorMessage}")
+ }
+
+ def endTime = System.currentTimeMillis()
+ def duration = (endTime - startTime) / 1000.0
+
+ logger.info("Test valid endpoint - Duration: ${duration} seconds,
Error: ${errorOccurred}")
+
+ // With valid endpoint, the error should be about
credentials/bucket, not connection
+ if (errorOccurred) {
+ assertFalse(errorMessage.contains("Failed to access object
storage") &&
+ errorMessage.contains(validEndpoint),
+ "Valid endpoint should not have connection failure:
${errorMessage}")
+ }
+ } else {
+ logger.info("Skipping valid endpoint test - no valid endpoint
configured")
+ }
+ }
+
+ // Run all tests
+ logger.info("=== Starting TVF Fast Fail Tests ===")
+
+ logger.info("Test 1: Invalid endpoint fast fail")
+ testInvalidEndpoint()
+
+ logger.info("Test 2: Non-existent endpoint fast fail")
+ testNonExistentEndpoint()
+
+ logger.info("Test 3: Malformed endpoint fast fail")
+ testMalformedEndpoint()
+
+ logger.info("Test 4: Valid endpoint comparison")
+ testValidEndpointComparison()
+
+ logger.info("=== TVF Fast Fail Tests Completed Successfully ===")
+
+ // Cleanup
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]