This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d2f65e288b [feat](iceberg) support aliyun dlf iceberg rest catalog 
(#60796)
9d2f65e288b is described below

commit 9d2f65e288bf06d96cf3f0277582fc2c53b46582
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Mar 2 15:24:04 2026 +0800

    [feat](iceberg) support aliyun dlf iceberg rest catalog (#60796)
    
    ### What problem does this PR solve?
    
    depends on #60856
    
    Support Aliyun DLF (Data Lake Formation) as an Iceberg REST catalog.
    
    Main changes:
    - Preserve case-sensitive REST signing names by removing toLowerCase()on
    `rest.signing-name`, so names like "DlfNext" work correctly.
    - Prioritize non-S3 storage properties when multiple S3-compatible
      properties are returned from iceberg rest catalog, to avoid
      incorrectly using S3 credentials for non-S3 storage (e.g. OSS).
    - Remove redundant resetToUninitialized() override in HMSExternalCatalog
      that only delegates to super.
    - Add IcebergDlfRestCatalogTest, IcebergRestPropertiesTest and
      StoragePropertiesTest for DLF REST catalog integration and property
      resolution testing.
    
    ```
    CREATE CATALOG ice PROPERTIES (
        'type' = 'iceberg',
        'warehouse' = 'new_dlf_iceberg_catalog',
        'iceberg.catalog.type' = 'rest',
        'iceberg.rest.uri' = 'http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg',
        'iceberg.rest.sigv4-enabled' = 'true',
        'iceberg.rest.signing-name' = 'DlfNext',
        'iceberg.rest.access-key-id' = 'xx',
        'iceberg.rest.secret-access-key' = 'xx',
        'iceberg.rest.signing-region' = 'cn-beijing',
        'iceberg.rest.vended-credentials-enabled' = 'true',
        'io-impl' = 'org.apache.iceberg.rest.DlfFileIO',
        'fs.oss.support' = 'true'
    );
    ```
---
 build.sh                                           |  44 ++-
 .../apache/doris/datasource/CatalogProperty.java   |  25 +-
 .../AbstractVendedCredentialsProvider.java         |   2 +-
 .../credentials/VendedCredentialsFactory.java      |   2 +-
 .../doris/datasource/hive/HMSExternalCatalog.java  |   5 -
 .../datasource/iceberg/IcebergExternalCatalog.java |   4 +-
 .../iceberg/IcebergVendedCredentialsProvider.java  |  15 +-
 .../paimon/PaimonVendedCredentialsProvider.java    |   2 +-
 .../datasource/property/ConnectorProperty.java     |   2 +
 .../metastore/AbstractIcebergProperties.java       |  99 ++++-
 .../IcebergFileSystemMetaStoreProperties.java      |  32 +-
 .../metastore/IcebergGlueMetaStoreProperties.java  |  16 +-
 .../metastore/IcebergHMSMetaStoreProperties.java   |  42 +-
 .../metastore/IcebergJdbcMetaStoreProperties.java  |  55 +--
 .../property/metastore/IcebergRestProperties.java  |  78 +---
 .../IcebergS3TablesMetaStoreProperties.java        |  10 -
 .../property/metastore/MetastoreProperties.java    |   2 +
 .../TrinoConnectorPropertiesFactory.java}          |  22 +-
 .../storage/AbstractS3CompatibleProperties.java    |  37 ++
 .../datasource/property/storage/COSProperties.java |   1 +
 .../property/storage/MinioProperties.java          |   4 +
 .../datasource/property/storage/OBSProperties.java |   1 +
 .../datasource/property/storage/OSSProperties.java |   6 +-
 .../datasource/property/storage/S3Properties.java  |   1 +
 .../property/storage/StorageProperties.java        | 116 ++++--
 .../AbstractVendedCredentialsProviderTest.java     |   6 +-
 .../doris/datasource/iceberg/IcebergUtilsTest.java |   5 +
 .../metastore/IcebergDlfRestCatalogTest.java       | 124 ++++++
 .../IcebergJdbcMetaStorePropertiesTest.java        |   5 +-
 .../metastore/IcebergRestPropertiesTest.java       | 128 ++++++-
 .../IcebergUnityCatalogRestCatalogTest.java        |   1 -
 .../property/storage/COSPropertiesTest.java        |   8 +-
 .../property/storage/OBSPropertyTest.java          |   7 +-
 .../property/storage/OSSPropertiesTest.java        |   5 +-
 .../property/storage/OzonePropertiesTest.java      |  13 +-
 .../property/storage/S3PropertiesTest.java         |   4 +-
 .../property/storage/StoragePropertiesTest.java    | 421 +++++++++++++++++++++
 regression-test/conf/regression-conf.groovy        |   2 +
 .../iceberg/test_iceberg_dlf_rest_catalog.out      |  17 +
 .../iceberg/test_iceberg_dlf_rest_catalog.groovy   |  45 +++
 40 files changed, 1098 insertions(+), 316 deletions(-)

diff --git a/build.sh b/build.sh
index 03b265e1f2a..122be348ad4 100755
--- a/build.sh
+++ b/build.sh
@@ -474,6 +474,12 @@ if [[ "$(echo "${DISABLE_BUILD_AZURE}" | tr '[:lower:]' 
'[:upper:]')" == "ON" ]]
     BUILD_AZURE='OFF'
 fi
 
+if [[ "$(echo "${DISABLE_BUILD_JINDOFS}" | tr '[:lower:]' '[:upper:]')" == 
"ON" ]]; then
+    BUILD_JINDOFS='OFF'
+else
+    BUILD_JINDOFS='ON'
+fi
+
 if [[ -z "${ENABLE_INJECTION_POINT}" ]]; then
     ENABLE_INJECTION_POINT='OFF'
 fi
@@ -795,7 +801,9 @@ if [[ "${BUILD_FE}" -eq 1 ]]; then
     cp -r -p "${DORIS_HOME}/conf/ldap.conf" "${DORIS_OUTPUT}/fe/conf"/
     cp -r -p "${DORIS_HOME}/conf/mysql_ssl_default_certificate" 
"${DORIS_OUTPUT}/fe/"/
     rm -rf "${DORIS_OUTPUT}/fe/lib"/*
-    install -d "${DORIS_OUTPUT}/fe/lib/jindofs"
+    if [[ "${BUILD_JINDOFS}" == "ON" ]]; then
+        install -d "${DORIS_OUTPUT}/fe/lib/jindofs"
+    fi
     cp -r -p "${DORIS_HOME}/fe/fe-core/target/lib"/* "${DORIS_OUTPUT}/fe/lib"/
     cp -r -p "${DORIS_HOME}/fe/fe-core/target/doris-fe.jar" 
"${DORIS_OUTPUT}/fe/lib"/
     if [[ "${WITH_TDE_DIR}" != "" ]]; then
@@ -805,13 +813,15 @@ if [[ "${BUILD_FE}" -eq 1 ]]; then
     #cp -r -p "${DORIS_HOME}/docs/build/help-resource.zip" 
"${DORIS_OUTPUT}/fe/lib"/
 
     # copy jindofs jars, only support for Linux x64 or arm
-    if [[ "${TARGET_SYSTEM}" == 'Linux' ]] && [[ "${TARGET_ARCH}" == 'x86_64' 
]]; then
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-[0-9]*.jar 
"${DORIS_OUTPUT}/fe/lib/jindofs"/
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-linux-ubuntu22-x86_64-[0-9]*.jar
 "${DORIS_OUTPUT}/fe/lib/jindofs"/
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-sdk-[0-9]*.jar 
"${DORIS_OUTPUT}/fe/lib/jindofs"/
-    elif [[ "${TARGET_SYSTEM}" == 'Linux' ]] && [[ "${TARGET_ARCH}" == 
'aarch64' ]]; then
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-linux-el7-aarch64-[0-9]*.jar
 "${DORIS_OUTPUT}/fe/lib/jindofs"/
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-sdk-[0-9]*.jar 
"${DORIS_OUTPUT}/fe/lib/jindofs"/
+    if [[ "${BUILD_JINDOFS}" == "ON" ]]; then
+        if [[ "${TARGET_SYSTEM}" == 'Linux' ]] && [[ "${TARGET_ARCH}" == 
'x86_64' ]]; then
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-[0-9]*.jar 
"${DORIS_OUTPUT}/fe/lib/jindofs"/
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-linux-ubuntu22-x86_64-[0-9]*.jar
 "${DORIS_OUTPUT}/fe/lib/jindofs"/
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-sdk-[0-9]*.jar 
"${DORIS_OUTPUT}/fe/lib/jindofs"/
+        elif [[ "${TARGET_SYSTEM}" == 'Linux' ]] && [[ "${TARGET_ARCH}" == 
'aarch64' ]]; then
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-linux-el7-aarch64-[0-9]*.jar
 "${DORIS_OUTPUT}/fe/lib/jindofs"/
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-sdk-[0-9]*.jar 
"${DORIS_OUTPUT}/fe/lib/jindofs"/
+        fi
     fi
 
     cp -r -p "${DORIS_HOME}/minidump" "${DORIS_OUTPUT}/fe"/
@@ -981,14 +991,16 @@ EOF
     done        
 
     # copy jindofs jars, only support for Linux x64 or arm
-    install -d "${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
-    if [[ "${TARGET_SYSTEM}" == 'Linux' ]] && [[ "$TARGET_ARCH" == 'x86_64' 
]]; then
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-[0-9]*.jar 
"${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-linux-ubuntu22-x86_64-[0-9]*.jar
 "${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-sdk-[0-9]*.jar 
"${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
-    elif [[ "${TARGET_SYSTEM}" == 'Linux' ]] && [[ "$TARGET_ARCH" == 'aarch64' 
]]; then
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-linux-el7-aarch64-[0-9]*.jar
 "${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
-        cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-sdk-[0-9]*.jar 
"${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
+    if [[ "${BUILD_JINDOFS}" == "ON" ]]; then
+        install -d "${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
+        if [[ "${TARGET_SYSTEM}" == 'Linux' ]] && [[ "$TARGET_ARCH" == 
'x86_64' ]]; then
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-[0-9]*.jar 
"${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-linux-ubuntu22-x86_64-[0-9]*.jar
 "${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-sdk-[0-9]*.jar 
"${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
+        elif [[ "${TARGET_SYSTEM}" == 'Linux' ]] && [[ "$TARGET_ARCH" == 
'aarch64' ]]; then
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-core-linux-el7-aarch64-[0-9]*.jar
 "${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
+            cp -r -p 
"${DORIS_THIRDPARTY}"/installed/jindofs_libs/jindo-sdk-[0-9]*.jar 
"${DORIS_OUTPUT}/be/lib/java_extensions/jindofs"/
+        fi
     fi
 
     cp -r -p "${DORIS_THIRDPARTY}/installed/webroot"/* 
"${DORIS_OUTPUT}/be/www"/
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
index 426d7aff001..540e23e1628 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java
@@ -18,10 +18,13 @@
 package org.apache.doris.datasource;
 
 import org.apache.doris.common.UserException;
+import 
org.apache.doris.datasource.credentials.AbstractVendedCredentialsProvider;
+import org.apache.doris.datasource.credentials.VendedCredentialsFactory;
 import org.apache.doris.datasource.property.metastore.MetastoreProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.collections4.MapUtils;
@@ -174,9 +177,20 @@ public class CatalogProperty {
             synchronized (this) {
                 if (storagePropertiesMap == null) {
                     try {
-                        this.orderedStoragePropertiesList = 
StorageProperties.createAll(getProperties());
-                        this.storagePropertiesMap = 
orderedStoragePropertiesList.stream()
-                                
.collect(Collectors.toMap(StorageProperties::getType, Function.identity()));
+                        boolean checkStorageProperties = true;
+                        AbstractVendedCredentialsProvider provider =
+                                
VendedCredentialsFactory.getProviderType(getMetastoreProperties());
+                        if (provider != null) {
+                            checkStorageProperties = 
!provider.isVendedCredentialsEnabled(getMetastoreProperties());
+                        }
+                        if (checkStorageProperties) {
+                            this.orderedStoragePropertiesList = 
StorageProperties.createAll(getProperties());
+                            this.storagePropertiesMap = 
orderedStoragePropertiesList.stream()
+                                    
.collect(Collectors.toMap(StorageProperties::getType, Function.identity()));
+                        } else {
+                            this.orderedStoragePropertiesList = 
Lists.newArrayList();
+                            this.storagePropertiesMap = Maps.newHashMap();
+                        }
                     } catch (UserException e) {
                         LOG.warn("Failed to initialize catalog storage 
properties", e);
                         throw new RuntimeException("Failed to initialize 
storage properties, error: "
@@ -199,16 +213,13 @@ public class CatalogProperty {
 
     public void checkMetaStoreAndStorageProperties(Class msClass) {
         MetastoreProperties msProperties;
-        List<StorageProperties> storageProperties;
         try {
             msProperties = MetastoreProperties.create(getProperties());
-            storageProperties = StorageProperties.createAll(getProperties());
+            initStorageProperties();
         } catch (UserException e) {
             throw new RuntimeException("Failed to initialize Catalog 
properties, error: "
                     + ExceptionUtils.getRootCauseMessage(e), e);
         }
-        Preconditions.checkNotNull(storageProperties,
-                "Storage properties are not configured properly");
         Preconditions.checkNotNull(msProperties, "Metastore properties are not 
configured properly");
         Preconditions.checkArgument(
                 msClass.isInstance(msProperties),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/AbstractVendedCredentialsProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/AbstractVendedCredentialsProvider.java
index bf41f26cff2..105339f1292 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/AbstractVendedCredentialsProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/AbstractVendedCredentialsProvider.java
@@ -85,7 +85,7 @@ public abstract class AbstractVendedCredentialsProvider {
     /**
      * Check whether to enable vendor credentials (subclass implementation)
      */
-    protected abstract boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties);
+    public abstract boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties);
 
     /**
      * Extract original vendored credentials from table objects (subclass 
implementation)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/VendedCredentialsFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/VendedCredentialsFactory.java
index 6438a350b1c..6528fdb8929 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/VendedCredentialsFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/VendedCredentialsFactory.java
@@ -53,7 +53,7 @@ public class VendedCredentialsFactory {
     /**
      * Select the right provider according to the MetastoreProperties type
      */
-    private static AbstractVendedCredentialsProvider 
getProviderType(MetastoreProperties metastoreProperties) {
+    public static AbstractVendedCredentialsProvider 
getProviderType(MetastoreProperties metastoreProperties) {
         if (metastoreProperties == null) {
             return null;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index a0bad3a4516..fe86390b581 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -154,11 +154,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
         metadataOps = hiveOps;
     }
 
-    @Override
-    public synchronized void resetToUninitialized(boolean invalidCache) {
-        super.resetToUninitialized(invalidCache);
-    }
-
     @Override
     public void onClose() {
         super.onClose();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index ae237933378..04fda134c79 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -75,9 +75,7 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     protected void initCatalog() {
         try {
             msProperties = (AbstractIcebergProperties) 
catalogProperty.getMetastoreProperties();
-            this.catalog = msProperties.initializeCatalog(getName(), 
catalogProperty
-                    .getOrderedStoragePropertiesList());
-
+            this.catalog = msProperties.initializeCatalog(getName(), 
catalogProperty.getOrderedStoragePropertiesList());
             this.icebergCatalogType = msProperties.getIcebergCatalogType();
         } catch (ClassCastException e) {
             throw new RuntimeException("Invalid properties for Iceberg 
catalog: " + getProperties(), e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProvider.java
index a4f0bf82c03..399a0408fbe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergVendedCredentialsProvider.java
@@ -23,6 +23,9 @@ import 
org.apache.doris.datasource.property.metastore.MetastoreProperties;
 
 import com.google.common.collect.Maps;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.StorageCredential;
+import org.apache.iceberg.io.SupportsStorageCredentials;
 
 import java.util.Map;
 
@@ -38,7 +41,7 @@ public class IcebergVendedCredentialsProvider extends 
AbstractVendedCredentialsP
     }
 
     @Override
-    protected boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
+    public boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
         if (metastoreProperties instanceof IcebergRestProperties) {
             return ((IcebergRestProperties) 
metastoreProperties).isIcebergRestVendedCredentialsEnabled();
         }
@@ -57,7 +60,15 @@ public class IcebergVendedCredentialsProvider extends 
AbstractVendedCredentialsP
         }
 
         // Return table.io().properties() directly, and let 
StorageProperties.createAll() to convert the format
-        return table.io().properties();
+        FileIO fileIO = table.io();
+        Map<String, String> ioProps = Maps.newHashMap(fileIO.properties());
+        if (fileIO instanceof SupportsStorageCredentials) {
+            SupportsStorageCredentials ssc = (SupportsStorageCredentials) 
fileIO;
+            for (StorageCredential storageCredential : ssc.credentials()) {
+                ioProps.putAll(storageCredential.config());
+            }
+        }
+        return ioProps;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonVendedCredentialsProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonVendedCredentialsProvider.java
index 19e95ed2fd8..0ea91a375c0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonVendedCredentialsProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonVendedCredentialsProvider.java
@@ -40,7 +40,7 @@ public class PaimonVendedCredentialsProvider extends 
AbstractVendedCredentialsPr
     }
 
     @Override
-    protected boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
+    public boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
         // Paimon REST catalog always supports vended credentials if it's REST 
type
         return metastoreProperties instanceof PaimonRestMetaStoreProperties;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
index 8cda6f99b8b..7078f6c61a3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
@@ -29,4 +29,6 @@ public @interface ConnectorProperty {
     boolean supported() default true;
 
     boolean sensitive() default false;
+
+    boolean isRegionField() default false;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
index d9777d10b39..8123e698727 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
@@ -21,11 +21,19 @@ import 
org.apache.doris.common.security.authentication.ExecutionAuthenticator;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.metacache.CacheSpec;
 import org.apache.doris.datasource.property.ConnectorProperty;
+import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
+import org.apache.doris.datasource.property.storage.S3Properties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
 import org.apache.iceberg.catalog.Catalog;
 
 import java.util.HashMap;
@@ -82,6 +90,14 @@ public abstract class AbstractIcebergProperties extends 
MetastoreProperties {
     )
     protected String ioManifestCacheMaxContentLength;
 
+    @Getter
+    @ConnectorProperty(
+            names = {CatalogProperties.FILE_IO_IMPL},
+            required = false,
+            description = "Custom io impl for iceberg"
+    )
+    protected String ioImpl;
+
     @Getter
     protected ExecutionAuthenticator executionAuthenticator = new 
ExecutionAuthenticator(){};
 
@@ -114,7 +130,7 @@ public abstract class AbstractIcebergProperties extends 
MetastoreProperties {
      * and deleting Iceberg tables.
      */
     public final Catalog initializeCatalog(String catalogName,
-                                                        
List<StorageProperties> storagePropertiesList) {
+                                           List<StorageProperties> 
storagePropertiesList) {
         Map<String, String> catalogProps = new HashMap<>(getOrigProps());
         if (StringUtils.isNotBlank(warehouse)) {
             catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
@@ -179,4 +195,85 @@ public abstract class AbstractIcebergProperties extends 
MetastoreProperties {
             Map<String, String> catalogProps,
             List<StorageProperties> storagePropertiesList
     );
+
+    /**
+     * Unified method to configure FileIO properties for Iceberg catalog.
+     * This method handles all storage types (HDFS, S3, MinIO, etc.) by:
+     * 1. Adding all storage properties to Hadoop Configuration (for 
HadoopFileIO / S3A access).
+     * 2. Extracting S3-compatible properties into fileIOProperties map (for 
Iceberg S3FileIO).
+     *
+     * @param storagePropertiesList list of storage properties
+     * @param fileIOProperties options map to be populated with S3 FileIO 
properties
+     * @return Hadoop Configuration populated with all storage properties
+     */
+    public void toFileIOProperties(List<StorageProperties> 
storagePropertiesList,
+            Map<String, String> fileIOProperties, Configuration conf) {
+        // We only support one S3-compatible storage property for FileIO 
configuration.
+        // When multiple AbstractS3CompatibleProperties exist, prefer the 
first non-S3Properties one,
+        // because a non-S3 type (e.g. OSSProperties, COSProperties) indicates 
the user has explicitly
+        // specified a concrete S3-compatible storage, which should take 
priority over the generic S3Properties.
+        AbstractS3CompatibleProperties s3Fallback = null;
+        AbstractS3CompatibleProperties s3Target = null;
+        for (StorageProperties storageProperties : storagePropertiesList) {
+            if (conf != null && storageProperties.getHadoopStorageConfig() != 
null) {
+                conf.addResource(storageProperties.getHadoopStorageConfig());
+            }
+            if (storageProperties instanceof AbstractS3CompatibleProperties) {
+                if (s3Fallback == null) {
+                    s3Fallback = (AbstractS3CompatibleProperties) 
storageProperties;
+                }
+                if (s3Target == null && !(storageProperties instanceof 
S3Properties)) {
+                    s3Target = (AbstractS3CompatibleProperties) 
storageProperties;
+                }
+            }
+        }
+        AbstractS3CompatibleProperties chosen = s3Target != null ? s3Target : 
s3Fallback;
+        if (chosen != null) {
+            toS3FileIOProperties(chosen, fileIOProperties);
+        } else {
+            String region = 
AbstractS3CompatibleProperties.getRegionFromProperties(fileIOProperties);
+            if (!Strings.isNullOrEmpty(region)) {
+                fileIOProperties.put(AwsClientProperties.CLIENT_REGION, 
region);
+            }
+        }
+    }
+
+    /**
+     * Configure S3 FileIO properties for all S3-compatible storage types (S3, 
MinIO, etc.)
+     * This method provides a unified way to convert S3-compatible properties 
to Iceberg S3FileIO format.
+     *
+     * @param s3Properties S3-compatible properties
+     * @param options Options map to be populated with S3 FileIO properties
+     */
+    private void toS3FileIOProperties(AbstractS3CompatibleProperties 
s3Properties, Map<String, String> options) {
+        // Common properties - only set if not blank
+        if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
+            options.put(S3FileIOProperties.ENDPOINT, 
s3Properties.getEndpoint());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
+            options.put(S3FileIOProperties.PATH_STYLE_ACCESS, 
s3Properties.getUsePathStyle());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getRegion())) {
+            options.put(AwsClientProperties.CLIENT_REGION, 
s3Properties.getRegion());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
+            options.put(S3FileIOProperties.ACCESS_KEY_ID, 
s3Properties.getAccessKey());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
+            options.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
s3Properties.getSecretKey());
+        }
+        if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
+            options.put(S3FileIOProperties.SESSION_TOKEN, 
s3Properties.getSessionToken());
+        }
+    }
+
+    protected Catalog buildIcebergCatalog(String catalogName, Map<String, 
String> options, Configuration conf) {
+        // For Iceberg SDK, "type" means catalog type, such as hive, jdbc, 
rest.
+        // But in Doris, "type" is "iceberg".
+        // And Iceberg SDK does not allow with both "type" and "catalog-impl" 
properties,
+        // So here we remove "type" and make sure "catalog-impl" is set.
+        options.remove(CatalogUtil.ICEBERG_CATALOG_TYPE);
+        
Preconditions.checkArgument(options.containsKey(CatalogProperties.CATALOG_IMPL));
+        return CatalogUtil.buildIcebergCatalog(catalogName, options, conf);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
index d644b7b06e0..3f7327b786c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java
@@ -24,8 +24,9 @@ import 
org.apache.doris.datasource.property.storage.StorageProperties;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.hadoop.HadoopCatalog;
 
 import java.util.List;
 import java.util.Map;
@@ -44,34 +45,20 @@ public class IcebergFileSystemMetaStoreProperties extends 
AbstractIcebergPropert
     @Override
     public Catalog initCatalog(String catalogName, Map<String, String> 
catalogProps,
                                List<StorageProperties> storagePropertiesList) {
-        Configuration configuration = 
buildConfiguration(storagePropertiesList);
-        HadoopCatalog catalog = new HadoopCatalog();
-        buildCatalogProps(storagePropertiesList);
-        catalog.setConf(configuration);
         try {
-            this.executionAuthenticator.execute(() -> {
-                catalog.initialize(catalogName, catalogProps);
-                return null;
-            });
+            Configuration configuration = new Configuration();
+            toFileIOProperties(storagePropertiesList, catalogProps, 
configuration);
+            catalogProps.put(CatalogProperties.CATALOG_IMPL, 
CatalogUtil.ICEBERG_CATALOG_HADOOP);
+            buildExecutionAuthenticator(storagePropertiesList);
+            return this.executionAuthenticator.execute(() ->
+                    buildIcebergCatalog(catalogName, catalogProps, 
configuration));
         } catch (Exception e) {
             throw new RuntimeException("Failed to initialize iceberg 
filesystem catalog: "
                     + ExceptionUtils.getRootCauseMessage(e), e);
         }
-
-        return catalog;
-    }
-
-    private Configuration buildConfiguration(List<StorageProperties> 
storagePropertiesList) {
-        Configuration configuration = new Configuration();
-        for (StorageProperties sp : storagePropertiesList) {
-            if (sp.getHadoopStorageConfig() != null) {
-                configuration.addResource(sp.getHadoopStorageConfig());
-            }
-        }
-        return configuration;
     }
 
-    private void buildCatalogProps(List<StorageProperties> 
storagePropertiesList) {
+    private void buildExecutionAuthenticator(List<StorageProperties> 
storagePropertiesList) {
         if (storagePropertiesList.size() == 1 && storagePropertiesList.get(0) 
instanceof HdfsProperties) {
             HdfsProperties hdfsProps = (HdfsProperties) 
storagePropertiesList.get(0);
             if (hdfsProps.isKerberos()) {
@@ -83,5 +70,4 @@ public class IcebergFileSystemMetaStoreProperties extends 
AbstractIcebergPropert
             }
         }
     }
-
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java
index 3039a96ea9d..1517a599094 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java
@@ -24,8 +24,8 @@ import 
org.apache.doris.datasource.property.storage.StorageProperties;
 import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.aws.AwsProperties;
-import org.apache.iceberg.aws.glue.GlueCatalog;
 import org.apache.iceberg.aws.s3.S3FileIOProperties;
 import org.apache.iceberg.catalog.Catalog;
 
@@ -65,15 +65,11 @@ public class IcebergGlueMetaStoreProperties extends 
AbstractIcebergProperties {
         appendS3Props(catalogProps);
         appendGlueProps(catalogProps);
         catalogProps.put("client.region", glueProperties.glueRegion);
-        if (StringUtils.isNotBlank(warehouse)) {
-            catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
-        } else {
-            catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, 
CHECKED_WAREHOUSE);
-        }
-
-        GlueCatalog catalog = new GlueCatalog();
-        catalog.initialize(catalogName, catalogProps);
-        return catalog;
+        catalogProps.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION, 
CHECKED_WAREHOUSE);
+        // can not set
+        catalogProps.remove(CatalogUtil.ICEBERG_CATALOG_TYPE);
+        catalogProps.put(CatalogProperties.CATALOG_IMPL, 
CatalogUtil.ICEBERG_CATALOG_GLUE);
+        return buildIcebergCatalog(catalogName, catalogProps, null);
     }
 
     private void appendS3Props(Map<String, String> props) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
index dc6b4b448ae..7484a529490 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java
@@ -25,10 +25,11 @@ import 
org.apache.doris.datasource.property.storage.StorageProperties;
 import lombok.Getter;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.hive.HiveCatalog;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -66,26 +67,11 @@ public class IcebergHMSMetaStoreProperties extends 
AbstractIcebergProperties {
     @Override
     public Catalog initCatalog(String catalogName, Map<String, String> 
catalogProps,
                                List<StorageProperties> storagePropertiesList) {
-        checkInitialized();
-        Configuration conf = buildHiveConfiguration(storagePropertiesList);
-        HiveCatalog hiveCatalog = new HiveCatalog();
-        hiveCatalog.setConf(conf);
-        storagePropertiesList.forEach(sp -> {
-            // NOTE: Custom FileIO implementation (KerberizedHadoopFileIO) is 
commented out by default.
-            // Using FileIO for Kerberos authentication may cause 
serialization issues when accessing
-            // Iceberg system tables (e.g., history, snapshots, manifests).
-            /*if (sp instanceof HdfsProperties) {
-                HdfsProperties hdfsProps = (HdfsProperties) sp;
-                if (hdfsProps.isKerberos()) {
-                    catalogProps.put(CatalogProperties.FILE_IO_IMPL,
-                            
"org.apache.doris.datasource.iceberg.fileio.DelegateFileIO");
-                }
-            }*/
-        });
-        buildCatalogProperties(catalogProps);
         try {
-            this.executionAuthenticator.execute(() -> 
hiveCatalog.initialize(catalogName, catalogProps));
-            return hiveCatalog;
+            catalogProps.put(CatalogProperties.CATALOG_IMPL, 
CatalogUtil.ICEBERG_CATALOG_HIVE);
+            Configuration conf = buildHiveConfiguration(storagePropertiesList);
+            return this.executionAuthenticator.execute(() ->
+                    buildIcebergCatalog(catalogName, catalogProps, conf));
         } catch (Exception e) {
             throw new RuntimeException("Failed to initialize HiveCatalog for 
Iceberg. "
                     + "CatalogName=" + catalogName + ", msg :" + 
ExceptionUtils.getRootCauseMessage(e), e);
@@ -105,20 +91,4 @@ public class IcebergHMSMetaStoreProperties extends 
AbstractIcebergProperties {
         }
         return conf;
     }
-
-    /**
-     * Constructs HiveCatalog's property map.
-     */
-    private void buildCatalogProperties(Map<String, String> catalogProps) {
-        Map<String, String> props = new HashMap<>();
-        catalogProps.put(HiveCatalog.LIST_ALL_TABLES, 
String.valueOf(listAllTables));
-        props.put("uri", hmsBaseProperties.getHiveMetastoreUri());
-    }
-
-    private void checkInitialized() {
-        if (hmsBaseProperties == null) {
-            throw new IllegalStateException("HMS properties not initialized."
-                    + " You must call initNormalizeAndCheckProps() first.");
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
index 5c81532edd4..ecd74ee5fe1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStoreProperties.java
@@ -20,16 +20,12 @@ package org.apache.doris.datasource.property.metastore;
 import org.apache.doris.catalog.JdbcResource;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.property.ConnectorProperty;
-import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 
-import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.aws.AwsClientProperties;
-import org.apache.iceberg.aws.s3.S3FileIOProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -136,13 +132,9 @@ public class IcebergJdbcMetaStoreProperties extends 
AbstractIcebergProperties {
     @Override
     public Catalog initCatalog(String catalogName, Map<String, String> 
catalogProps,
             List<StorageProperties> storagePropertiesList) {
-        Map<String, String> fileIOProperties = Maps.newHashMap();
-        Configuration conf = new Configuration();
-        toFileIOProperties(storagePropertiesList, fileIOProperties, conf);
-
-        Map<String, String> options = 
Maps.newHashMap(getIcebergJdbcCatalogProperties());
-        options.putAll(fileIOProperties);
-
+        catalogProps.putAll(getIcebergJdbcCatalogProperties());
+        Configuration configuration = new Configuration();
+        toFileIOProperties(storagePropertiesList, catalogProps, configuration);
         // Support dynamic JDBC driver loading
         // We need to register the driver with DriverManager because Iceberg 
uses DriverManager.getConnection()
         // which doesn't respect Thread.contextClassLoader
@@ -150,7 +142,7 @@ public class IcebergJdbcMetaStoreProperties extends 
AbstractIcebergProperties {
             registerJdbcDriver(driverUrl, driverClass);
             LOG.info("Using dynamic JDBC driver from: {}", driverUrl);
         }
-        return CatalogUtil.buildIcebergCatalog(catalogName, options, conf);
+        return buildIcebergCatalog(catalogName, catalogProps, configuration);
     }
 
     /**
@@ -246,11 +238,8 @@ public class IcebergJdbcMetaStoreProperties extends 
AbstractIcebergProperties {
 
     private void initIcebergJdbcCatalogProperties() {
         icebergJdbcCatalogProperties = new HashMap<>();
-        icebergJdbcCatalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC);
+        icebergJdbcCatalogProperties.put(CatalogProperties.CATALOG_IMPL, 
CatalogUtil.ICEBERG_CATALOG_JDBC);
         icebergJdbcCatalogProperties.put(CatalogProperties.URI, uri);
-        if (StringUtils.isNotBlank(warehouse)) {
-            
icebergJdbcCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, 
warehouse);
-        }
         addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.user", jdbcUser);
         addIfNotBlank(icebergJdbcCatalogProperties, "jdbc.password", 
jdbcPassword);
         addIfNotBlank(icebergJdbcCatalogProperties, 
"jdbc.init-catalog-tables", jdbcInitCatalogTables);
@@ -273,38 +262,4 @@ public class IcebergJdbcMetaStoreProperties extends 
AbstractIcebergProperties {
             props.put(key, value);
         }
     }
-
-    private static void toFileIOProperties(List<StorageProperties> 
storagePropertiesList,
-            Map<String, String> fileIOProperties, Configuration conf) {
-        for (StorageProperties storageProperties : storagePropertiesList) {
-            if (storageProperties instanceof AbstractS3CompatibleProperties) {
-                toS3FileIOProperties((AbstractS3CompatibleProperties) 
storageProperties, fileIOProperties);
-            }
-            if (storageProperties.getHadoopStorageConfig() != null) {
-                conf.addResource(storageProperties.getHadoopStorageConfig());
-            }
-        }
-    }
-
-    private static void toS3FileIOProperties(AbstractS3CompatibleProperties 
s3Properties,
-            Map<String, String> options) {
-        if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
-            options.put(S3FileIOProperties.ENDPOINT, 
s3Properties.getEndpoint());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
-            options.put(S3FileIOProperties.PATH_STYLE_ACCESS, 
s3Properties.getUsePathStyle());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getRegion())) {
-            options.put(AwsClientProperties.CLIENT_REGION, 
s3Properties.getRegion());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
-            options.put(S3FileIOProperties.ACCESS_KEY_ID, 
s3Properties.getAccessKey());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
-            options.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
s3Properties.getSecretKey());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
-            options.put(S3FileIOProperties.SESSION_TOKEN, 
s3Properties.getSessionToken());
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
index 688a268522b..7100d269262 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java
@@ -20,17 +20,12 @@ package org.apache.doris.datasource.property.metastore;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.property.ConnectorProperty;
 import org.apache.doris.datasource.property.ParamRules;
-import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 
-import com.google.common.collect.Maps;
 import lombok.Getter;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.aws.AwsClientProperties;
-import org.apache.iceberg.aws.s3.S3FileIOProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.rest.auth.OAuth2Properties;
 import org.apache.logging.log4j.util.Strings;
@@ -176,16 +171,11 @@ public class IcebergRestProperties extends 
AbstractIcebergProperties {
     @Override
     public Catalog initCatalog(String catalogName, Map<String, String> 
catalogProps,
             List<StorageProperties> storagePropertiesList) {
-        Map<String, String> fileIOProperties = Maps.newHashMap();
-        Configuration conf = new Configuration();
-        toFileIOProperties(storagePropertiesList, fileIOProperties, conf);
-
-        // 3. Merge properties for REST catalog service.
-        Map<String, String> options = 
Maps.newHashMap(getIcebergRestCatalogProperties());
-        options.putAll(fileIOProperties);
-
+        catalogProps.putAll(getIcebergRestCatalogProperties());
+        Configuration configuration = new Configuration();
+        toFileIOProperties(storagePropertiesList, catalogProps, configuration);
         // 4. Build iceberg catalog
-        return CatalogUtil.buildIcebergCatalog(catalogName, options, conf);
+        return buildIcebergCatalog(catalogName, catalogProps, configuration);
     }
 
     @Override
@@ -253,7 +243,7 @@ public class IcebergRestProperties extends 
AbstractIcebergProperties {
 
     private void addCoreCatalogProperties() {
         // See CatalogUtil.java
-        icebergRestCatalogProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+        icebergRestCatalogProperties.put(CatalogProperties.CATALOG_IMPL, 
CatalogUtil.ICEBERG_CATALOG_REST);
         // See CatalogProperties.java
         icebergRestCatalogProperties.put(CatalogProperties.URI, 
icebergRestUri);
     }
@@ -306,7 +296,8 @@ public class IcebergRestProperties extends 
AbstractIcebergProperties {
 
     private void addGlueRestCatalogProperties() {
         if (Strings.isNotBlank(icebergRestSigningName)) {
-            icebergRestCatalogProperties.put("rest.signing-name", 
icebergRestSigningName.toLowerCase());
+            // signing-name is case sensible, do not use lowercase()
+            icebergRestCatalogProperties.put("rest.signing-name", 
icebergRestSigningName);
             icebergRestCatalogProperties.put("rest.sigv4-enabled", 
icebergRestSigV4Enabled);
             icebergRestCatalogProperties.put("rest.access-key-id", 
icebergRestAccessKeyId);
             icebergRestCatalogProperties.put("rest.secret-access-key", 
icebergRestSecretAccessKey);
@@ -314,7 +305,6 @@ public class IcebergRestProperties extends 
AbstractIcebergProperties {
         }
     }
 
-
     public Map<String, String> getIcebergRestCatalogProperties() {
         return Collections.unmodifiableMap(icebergRestCatalogProperties);
     }
@@ -327,60 +317,6 @@ public class IcebergRestProperties extends 
AbstractIcebergProperties {
         return Boolean.parseBoolean(icebergRestNestedNamespaceEnabled);
     }
 
-    /**
-     * Unified method to configure FileIO properties for Iceberg catalog.
-     * This method handles all storage types (HDFS, S3, MinIO, etc.) and 
populates
-     * the fileIOProperties map and Configuration object accordingly.
-     *
-     * @param storagePropertiesList Map of storage properties
-     * @param fileIOProperties Options map to be populated
-     * @param conf Configuration object to be populated (for HDFS), will be 
created if null and HDFS is used
-     */
-    public void toFileIOProperties(List<StorageProperties> 
storagePropertiesList,
-            Map<String, String> fileIOProperties, Configuration conf) {
-
-        for (StorageProperties storageProperties : storagePropertiesList) {
-            if (storageProperties instanceof AbstractS3CompatibleProperties) {
-                // For all S3-compatible storage types, put properties in 
fileIOProperties map
-                toS3FileIOProperties((AbstractS3CompatibleProperties) 
storageProperties, fileIOProperties);
-            } else {
-                // For other storage types, just use fileIOProperties map
-                conf.addResource(storageProperties.getHadoopStorageConfig());
-            }
-        }
-
-    }
-
-    /**
-     * Configure S3 FileIO properties for all S3-compatible storage types (S3, 
MinIO, etc.)
-     * This method provides a unified way to convert S3-compatible properties 
to Iceberg S3FileIO format.
-     *
-     * @param s3Properties S3-compatible properties
-     * @param options Options map to be populated with S3 FileIO properties
-     */
-    public void toS3FileIOProperties(AbstractS3CompatibleProperties 
s3Properties, Map<String, String> options) {
-        // Common properties - only set if not blank
-        if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
-            options.put(S3FileIOProperties.ENDPOINT, 
s3Properties.getEndpoint());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
-            options.put(S3FileIOProperties.PATH_STYLE_ACCESS, 
s3Properties.getUsePathStyle());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getRegion())) {
-            options.put(AwsClientProperties.CLIENT_REGION, 
s3Properties.getRegion());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
-            options.put(S3FileIOProperties.ACCESS_KEY_ID, 
s3Properties.getAccessKey());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
-            options.put(S3FileIOProperties.SECRET_ACCESS_KEY, 
s3Properties.getSecretKey());
-        }
-        if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
-            options.put(S3FileIOProperties.SESSION_TOKEN, 
s3Properties.getSessionToken());
-        }
-    }
-
-
     public enum Security {
         NONE,
         OAUTH2,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java
index 0b101623c2b..58edd445f23 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java
@@ -51,10 +51,7 @@ public class IcebergS3TablesMetaStoreProperties extends 
AbstractIcebergPropertie
     @Override
     public Catalog initCatalog(String catalogName, Map<String, String> 
catalogProps,
                                List<StorageProperties> storagePropertiesList) {
-        checkInitialized();
-
         buildS3CatalogProperties(catalogProps);
-
         S3TablesCatalog catalog = new S3TablesCatalog();
         try {
             catalog.initialize(catalogName, catalogProps);
@@ -73,11 +70,4 @@ public class IcebergS3TablesMetaStoreProperties extends 
AbstractIcebergPropertie
         props.put("client.credentials-provider.s3.session-token", 
s3Properties.getSessionToken());
         props.put("client.region", s3Properties.getRegion());
     }
-
-    private void checkInitialized() {
-        if (s3Properties == null) {
-            throw new IllegalStateException("S3Properties not initialized."
-                    + " Please call initNormalizeAndCheckProps() before 
using.");
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
index fc0e98de077..3f4fa596ba7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/MetastoreProperties.java
@@ -51,6 +51,7 @@ public class MetastoreProperties extends ConnectionProperties 
{
         DLF("dlf"),
         DATAPROC("dataproc"),
         FILE_SYSTEM("filesystem", "hadoop"),
+        TRINO_CONNECTOR("trino-connector"),
         UNKNOWN();
 
         private final Set<String> aliases;
@@ -85,6 +86,7 @@ public class MetastoreProperties extends ConnectionProperties 
{
         register(Type.HMS, new HivePropertiesFactory());
         register(Type.ICEBERG, new IcebergPropertiesFactory());
         register(Type.PAIMON, new PaimonPropertiesFactory());
+        register(Type.TRINO_CONNECTOR, new TrinoConnectorPropertiesFactory());
     }
 
     public static void register(Type type, MetastorePropertiesFactory factory) 
{
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/TrinoConnectorPropertiesFactory.java
similarity index 65%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/TrinoConnectorPropertiesFactory.java
index 8cda6f99b8b..5bdbafc58bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectorProperty.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/TrinoConnectorPropertiesFactory.java
@@ -15,18 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.datasource.property;
+package org.apache.doris.datasource.property.metastore;
 
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
+import org.apache.doris.datasource.property.metastore.MetastoreProperties.Type;
 
-@Retention(RetentionPolicy.RUNTIME)
-public @interface ConnectorProperty {
-    String[] names() default {};
-    String description() default "";
+import java.util.Map;
 
-    boolean required() default true;
-    boolean supported() default true;
-
-    boolean sensitive() default false;
+/**
+ * Just a placeholder
+ */
+public class TrinoConnectorPropertiesFactory extends 
AbstractMetastorePropertiesFactory {
+    @Override
+    public MetastoreProperties create(Map<String, String> props) {
+        return new MetastoreProperties(Type.TRINO_CONNECTOR, props);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index f8f7f6fad6c..7d7e5ffb1b6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -18,6 +18,8 @@
 package org.apache.doris.datasource.property.storage;
 
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.ConnectorPropertiesUtils;
+import org.apache.doris.datasource.property.ConnectorProperty;
 import org.apache.doris.datasource.property.common.AwsCredentialsProviderMode;
 
 import com.google.common.base.Preconditions;
@@ -31,7 +33,10 @@ import 
software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 
+import java.lang.reflect.Field;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -289,6 +294,38 @@ public abstract class AbstractS3CompatibleProperties 
extends StorageProperties i
         hadoopStorageConfig.set("fs.s3a.path.style.access", getUsePathStyle());
     }
 
+    /**
+     * Searches for a region value from the given properties map by scanning 
all known
+     * S3-compatible subclass region field annotations.
+     * <p>
+     * This method iterates through all known subclasses of {@link 
AbstractS3CompatibleProperties},
+     * finds fields annotated with {@code @ConnectorProperty(isRegionField = 
true)},
+     * and checks if any of the annotation's {@code names} exist in the 
provided properties map.
+     *
+     * @param props the property map to search for region values
+     * @return the region value if found, or {@code null} if no region 
property is present
+     */
+    public static String getRegionFromProperties(Map<String, String> props) {
+        List<Class<? extends AbstractS3CompatibleProperties>> subClasses = 
Arrays.asList(
+                S3Properties.class, OSSProperties.class, COSProperties.class,
+                OBSProperties.class, MinioProperties.class);
+        for (Class<?> clazz : subClasses) {
+            List<Field> fields = 
ConnectorPropertiesUtils.getConnectorProperties(clazz);
+            for (Field field : fields) {
+                ConnectorProperty annotation = 
field.getAnnotation(ConnectorProperty.class);
+                if (annotation != null && annotation.isRegionField()) {
+                    for (String name : annotation.names()) {
+                        String value = props.get(name);
+                        if (StringUtils.isNotBlank(value)) {
+                            return value;
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
     @Override
     public String getStorageName() {
         return "S3";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
index e52762472f6..80c94fa3efd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -47,6 +47,7 @@ public class COSProperties extends 
AbstractS3CompatibleProperties {
     @Setter
     @ConnectorProperty(names = {"cos.region", "s3.region", "AWS_REGION", 
"region", "REGION"},
             required = false,
+            isRegionField = true,
             description = "The region of COS.")
     protected String region = "";
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
index 73969b99c64..cec9ca23e2d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
@@ -36,6 +36,10 @@ public class MinioProperties extends 
AbstractS3CompatibleProperties {
     protected String endpoint = "";
     @Getter
     @Setter
+    @ConnectorProperty(names = {"minio.region", "s3.region", "AWS_REGION", 
"region", "REGION"},
+            required = false,
+            isRegionField = true,
+            description = "The region of MinIO.")
     protected String region = "us-east-1";
 
     @Getter
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
index 25afb2bbf03..2b2b886c524 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -62,6 +62,7 @@ public class OBSProperties extends 
AbstractS3CompatibleProperties {
     @Getter
     @Setter
     @ConnectorProperty(names = {"obs.region", "s3.region", "AWS_REGION", 
"region", "REGION"}, required = false,
+            isRegionField = true,
             description = "The region of OBS.")
     protected String region;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
index e92a925811c..a41f08b2401 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -70,8 +70,10 @@ public class OSSProperties extends 
AbstractS3CompatibleProperties {
 
     @Getter
     @Setter
-    @ConnectorProperty(names = {"oss.region", "s3.region", "AWS_REGION", 
"region", "REGION", "dlf.region"},
+    @ConnectorProperty(names = {"oss.region", "s3.region", "AWS_REGION", 
"region", "REGION", "dlf.region",
+        "iceberg.rest.signing-region"},
             required = false,
+            isRegionField = true,
             description = "The region of OSS.")
     protected String region;
 
@@ -177,7 +179,7 @@ public class OSSProperties extends 
AbstractS3CompatibleProperties {
 
     protected static boolean guessIsMe(Map<String, String> origProps) {
         String value = Stream.of("oss.endpoint", "s3.endpoint", 
"AWS_ENDPOINT", "endpoint", "ENDPOINT",
-                        "dlf.endpoint", "dlf.catalog.endpoint", 
"fs.oss.endpoint")
+                        "dlf.endpoint", "dlf.catalog.endpoint", 
"fs.oss.endpoint", "fs.oss.accessKeyId")
                 .map(origProps::get)
                 .filter(Objects::nonNull)
                 .findFirst()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index df09803be61..6bfb4f462ff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -81,6 +81,7 @@ public class S3Properties extends 
AbstractS3CompatibleProperties {
     @ConnectorProperty(names = {"s3.region", "AWS_REGION", "region", "REGION", 
"aws.region", "glue.region",
             "aws.glue.region", "iceberg.rest.signing-region", "client.region"},
             required = false,
+            isRegionField = true,
             description = "The region of S3.")
     protected String region = "";
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 0464dabbc67..687451ee1ba 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -34,7 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
+import java.util.function.BiFunction;
 
 public abstract class StorageProperties extends ConnectionProperties {
 
@@ -136,14 +136,19 @@ public abstract class StorageProperties extends 
ConnectionProperties {
      */
     public static List<StorageProperties> createAll(Map<String, String> 
origProps) throws UserException {
         List<StorageProperties> result = new ArrayList<>();
-        for (Function<Map<String, String>, StorageProperties> func : 
PROVIDERS) {
-            StorageProperties p = func.apply(origProps);
+        // If the user has explicitly specified any fs.xx.support=true, 
disable guessIsMe heuristics
+        // for all providers to avoid false-positive matches from ambiguous 
endpoint strings.
+        boolean useGuess = !hasAnyExplicitFsSupport(origProps);
+        for (BiFunction<Map<String, String>, Boolean, StorageProperties> func 
: PROVIDERS) {
+            StorageProperties p = func.apply(origProps, useGuess);
             if (p != null) {
                 result.add(p);
             }
         }
-        // Add default HDFS storage if not explicitly configured
-        if (result.stream().noneMatch(HdfsProperties.class::isInstance)) {
+        // When no explicit fs.xx.support flag is set, add a default HDFS 
storage as fallback.
+        // When the user has explicitly declared providers via 
fs.xx.support=true, skip the
+        // default HDFS to avoid injecting an unwanted provider into the 
result.
+        if (useGuess && 
result.stream().noneMatch(HdfsProperties.class::isInstance)) {
             result.add(0, new HdfsProperties(origProps, false));
         }
 
@@ -165,8 +170,11 @@ public abstract class StorageProperties extends 
ConnectionProperties {
      * @throws RuntimeException if no supported storage type is found
      */
     public static StorageProperties createPrimary(Map<String, String> 
origProps) {
-        for (Function<Map<String, String>, StorageProperties> func : 
PROVIDERS) {
-            StorageProperties p = func.apply(origProps);
+        // If the user has explicitly specified any fs.xx.support=true, 
disable guessIsMe heuristics
+        // for all providers to avoid false-positive matches from ambiguous 
endpoint strings.
+        boolean useGuess = !hasAnyExplicitFsSupport(origProps);
+        for (BiFunction<Map<String, String>, Boolean, StorageProperties> func 
: PROVIDERS) {
+            StorageProperties p = func.apply(origProps, useGuess);
             if (p != null) {
                 p.initNormalizeAndCheckProps();
                 p.buildHadoopStorageConfig();
@@ -176,44 +184,57 @@ public abstract class StorageProperties extends 
ConnectionProperties {
         throw new StoragePropertiesException("No supported storage type found. 
Please check your configuration.");
     }
 
-    private static final List<Function<Map<String, String>, 
StorageProperties>> PROVIDERS =
+    /**
+     * Registry of all supported storage provider detection functions.
+     * <p>
+     * Each entry is a {@link BiFunction} that takes:
+     * <ul>
+     *   <li>{@code props} — the user-supplied property map</li>
+     *   <li>{@code guess} — whether heuristic-based {@code guessIsMe} 
detection is enabled.
+     *       When {@code false}, only explicit {@code fs.xx.support=true} 
flags are honored,
+     *       preventing endpoint-based heuristics from causing false-positive 
matches
+     *       across providers (e.g., an {@code aliyuncs.com} endpoint 
accidentally
+     *       matching both OSS and S3).</li>
+     * </ul>
+     * Returns a {@link StorageProperties} instance if the provider matches, 
or {@code null} otherwise.
+     */
+    private static final List<BiFunction<Map<String, String>, Boolean, 
StorageProperties>> PROVIDERS =
             Arrays.asList(
-                    props -> (isFsSupport(props, FS_HDFS_SUPPORT)
-                            || HdfsProperties.guessIsMe(props)) ? new 
HdfsProperties(props) : null,
-                    props -> {
+                    (props, guess) -> (isFsSupport(props, FS_HDFS_SUPPORT)
+                            || (guess && HdfsProperties.guessIsMe(props))) ? 
new HdfsProperties(props) : null,
+                    (props, guess) -> {
                         // OSS-HDFS and OSS are mutually exclusive - check 
OSS-HDFS first
                         if ((isFsSupport(props, FS_OSS_HDFS_SUPPORT)
                                 || isFsSupport(props, 
DEPRECATED_OSS_HDFS_SUPPORT))
-                                || OSSHdfsProperties.guessIsMe(props)) {
+                                || (guess && 
OSSHdfsProperties.guessIsMe(props))) {
                             return new OSSHdfsProperties(props);
                         }
                         // Only check for regular OSS if OSS-HDFS is not 
enabled
                         if (isFsSupport(props, FS_OSS_SUPPORT)
-                                || OSSProperties.guessIsMe(props)) {
+                                || (guess && OSSProperties.guessIsMe(props))) {
                             return new OSSProperties(props);
                         }
                         return null;
                     },
-                    props -> (isFsSupport(props, FS_S3_SUPPORT)
-                            || S3Properties.guessIsMe(props)) ? new 
S3Properties(props) : null,
-                    props -> (isFsSupport(props, FS_OBS_SUPPORT)
-                            || OBSProperties.guessIsMe(props)) ? new 
OBSProperties(props) : null,
-                    props -> (isFsSupport(props, FS_COS_SUPPORT)
-                            || COSProperties.guessIsMe(props)) ? new 
COSProperties(props) : null,
-                    props -> (isFsSupport(props, FS_GCS_SUPPORT)
-                            || GCSProperties.guessIsMe(props)) ? new 
GCSProperties(props) : null,
-                    props -> (isFsSupport(props, FS_AZURE_SUPPORT)
-                            || AzureProperties.guessIsMe(props)) ? new 
AzureProperties(props) : null,
-                    props -> (isFsSupport(props, FS_MINIO_SUPPORT)
-                            || (!isFsSupport(props, FS_OZONE_SUPPORT)
-                            && MinioProperties.guessIsMe(props))) ? new 
MinioProperties(props) : null,
-                    props -> isFsSupport(props, FS_OZONE_SUPPORT) ? new 
OzoneProperties(props) : null,
-                    props -> (isFsSupport(props, FS_BROKER_SUPPORT)
-                            || BrokerProperties.guessIsMe(props)) ? new 
BrokerProperties(props) : null,
-                    props -> (isFsSupport(props, FS_LOCAL_SUPPORT)
-                            || LocalProperties.guessIsMe(props)) ? new 
LocalProperties(props) : null,
-                    props -> (isFsSupport(props, FS_HTTP_SUPPORT)
-                            || HttpProperties.guessIsMe(props)) ? new 
HttpProperties(props) : null
+                    (props, guess) -> (isFsSupport(props, FS_S3_SUPPORT)
+                            || (guess && S3Properties.guessIsMe(props))) ? new 
S3Properties(props) : null,
+                    (props, guess) -> (isFsSupport(props, FS_OBS_SUPPORT)
+                            || (guess && OBSProperties.guessIsMe(props))) ? 
new OBSProperties(props) : null,
+                    (props, guess) -> (isFsSupport(props, FS_COS_SUPPORT)
+                            || (guess && COSProperties.guessIsMe(props))) ? 
new COSProperties(props) : null,
+                    (props, guess) -> (isFsSupport(props, FS_GCS_SUPPORT)
+                            || (guess && GCSProperties.guessIsMe(props))) ? 
new GCSProperties(props) : null,
+                    (props, guess) -> (isFsSupport(props, FS_AZURE_SUPPORT)
+                            || (guess && AzureProperties.guessIsMe(props))) ? 
new AzureProperties(props) : null,
+                    (props, guess) -> (isFsSupport(props, FS_MINIO_SUPPORT)
+                            || (guess && MinioProperties.guessIsMe(props))) ? 
new MinioProperties(props) : null,
+                    (props, guess) -> isFsSupport(props, FS_OZONE_SUPPORT) ? 
new OzoneProperties(props) : null,
+                    (props, guess) -> (isFsSupport(props, FS_BROKER_SUPPORT)
+                            || (guess && BrokerProperties.guessIsMe(props))) ? 
new BrokerProperties(props) : null,
+                    (props, guess) -> (isFsSupport(props, FS_LOCAL_SUPPORT)
+                            || (guess && LocalProperties.guessIsMe(props))) ? 
new LocalProperties(props) : null,
+                    (props, guess) -> (isFsSupport(props, FS_HTTP_SUPPORT)
+                            || (guess && HttpProperties.guessIsMe(props))) ? 
new HttpProperties(props) : null
             );
 
     protected StorageProperties(Type type, Map<String, String> origProps) {
@@ -225,6 +246,35 @@ public abstract class StorageProperties extends 
ConnectionProperties {
         return origProps.getOrDefault(fsEnable, 
"false").equalsIgnoreCase("true");
     }
 
+    /**
+     * Checks whether the user has explicitly set any {@code 
fs.xx.support=true} property.
+     * <p>
+     * When at least one explicit {@code fs.xx.support} flag is present, the 
system should
+     * rely solely on these flags for provider matching and skip the 
heuristic-based
+     * {@code guessIsMe} inference. This prevents ambiguous endpoint strings 
(e.g.,
+     * {@code aliyuncs.com}) from accidentally triggering multiple providers 
(e.g.,
+     * both OSS and S3) at the same time.
+     *
+     * @param props the raw property map from user configuration
+     * @return {@code true} if any {@code fs.xx.support} property is 
explicitly set to "true"
+     */
+    private static boolean hasAnyExplicitFsSupport(Map<String, String> props) {
+        return isFsSupport(props, FS_HDFS_SUPPORT)
+                || isFsSupport(props, FS_S3_SUPPORT)
+                || isFsSupport(props, FS_GCS_SUPPORT)
+                || isFsSupport(props, FS_MINIO_SUPPORT)
+                || isFsSupport(props, FS_BROKER_SUPPORT)
+                || isFsSupport(props, FS_AZURE_SUPPORT)
+                || isFsSupport(props, FS_OSS_SUPPORT)
+                || isFsSupport(props, FS_OBS_SUPPORT)
+                || isFsSupport(props, FS_COS_SUPPORT)
+                || isFsSupport(props, FS_OSS_HDFS_SUPPORT)
+                || isFsSupport(props, FS_LOCAL_SUPPORT)
+                || isFsSupport(props, FS_HTTP_SUPPORT)
+                || isFsSupport(props, FS_OZONE_SUPPORT)
+                || isFsSupport(props, DEPRECATED_OSS_HDFS_SUPPORT);
+    }
+
     protected static boolean checkIdentifierKey(Map<String, String> origProps, 
List<Field> fields) {
         for (Field field : fields) {
             field.setAccessible(true);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/credentials/AbstractVendedCredentialsProviderTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/credentials/AbstractVendedCredentialsProviderTest.java
index 038c9ad9755..7b2f123f953 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/credentials/AbstractVendedCredentialsProviderTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/credentials/AbstractVendedCredentialsProviderTest.java
@@ -51,7 +51,7 @@ public class AbstractVendedCredentialsProviderTest {
         }
 
         @Override
-        protected boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
+        public boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
             return isVendedCredentialsEnabledResult;
         }
 
@@ -199,7 +199,7 @@ public class AbstractVendedCredentialsProviderTest {
         // Test the case where extractRawVendedCredentials returns null 
(simulating an internal failure)
         AbstractVendedCredentialsProvider provider = new 
AbstractVendedCredentialsProvider() {
             @Override
-            protected boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
+            public boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
                 return true;
             }
 
@@ -230,7 +230,7 @@ public class AbstractVendedCredentialsProviderTest {
         // Create a minimal provider that doesn't override getTableName() to 
test the default implementation
         AbstractVendedCredentialsProvider provider = new 
AbstractVendedCredentialsProvider() {
             @Override
-            protected boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
+            public boolean isVendedCredentialsEnabled(MetastoreProperties 
metastoreProperties) {
                 return true;
             }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
index e4ee0ec5e46..25fdc37fca4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
@@ -74,6 +74,8 @@ public class IcebergUtilsTest {
                     new IcebergHMSExternalCatalog(1, "name", null,
                             new HashMap<String, String>() {{
                                     put("list-all-tables", "true");
+                                    put("type", "hms");
+                                    put("hive.metastore.uris", 
"http://127.1.1.0:9000";);
                                 }},
                             "");
             HiveCatalog i2 = IcebergUtils.createIcebergHiveCatalog(c2, "i1");
@@ -83,11 +85,14 @@ public class IcebergUtilsTest {
                     new IcebergHMSExternalCatalog(1, "name", null,
                             new HashMap<String, String>() {{
                                     put("list-all-tables", "false");
+                                    put("type", "hms");
+                                    put("hive.metastore.uris", 
"http://127.1.1.0:9000";);
                                 }},
                         "");
             HiveCatalog i3 = IcebergUtils.createIcebergHiveCatalog(c3, "i1");
             Assert.assertFalse(getListAllTables(i3));
         } catch (Exception e) {
+            e.printStackTrace();
             Assert.fail();
         }
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergDlfRestCatalogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergDlfRestCatalogTest.java
new file mode 100644
index 00000000000..38c136796c0
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergDlfRestCatalogTest.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.metastore;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.io.CloseableIterable;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+@Disabled("set your aliyun's access key, secret key before running the test")
+public class IcebergDlfRestCatalogTest {
+
+    private String ak = "ak";
+    private String sk = "sk";
+
+    @Test
+    public void testIcebergDlRestCatalog() {
+        Catalog dlfRestCatalog = initIcebergDlfRestCatalog();
+        SupportsNamespaces nsCatalog = (SupportsNamespaces) dlfRestCatalog;
+        // List namespaces and assert
+        nsCatalog.listNamespaces(Namespace.empty()).forEach(namespace1 -> {
+            System.out.println("Namespace: " + namespace1);
+            Assertions.assertNotNull(namespace1, "Namespace should not be 
null");
+
+            dlfRestCatalog.listTables(namespace1).forEach(tableIdentifier -> {
+                System.out.println("Table: " + tableIdentifier.name());
+                Assertions.assertNotNull(tableIdentifier, "TableIdentifier 
should not be null");
+
+                // Load table history and assert
+                Table iceTable = dlfRestCatalog.loadTable(tableIdentifier);
+                iceTable.history().forEach(snapshot -> {
+                    System.out.println("Snapshot: " + snapshot);
+                    Assertions.assertNotNull(snapshot, "Snapshot should not be 
null");
+                });
+
+                CloseableIterable<FileScanTask> tasks = 
iceTable.newScan().planFiles();
+                tasks.forEach(task -> {
+                    System.out.println("FileScanTask: " + task);
+                    Assertions.assertNotNull(task, "FileScanTask should not be 
null");
+                });
+            });
+        });
+    }
+
+    private Catalog initIcebergDlfRestCatalog() {
+        Map<String, String> options = Maps.newHashMap();
+        options.put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+        options.put(CatalogProperties.URI, 
"http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg";);
+        options.put(CatalogProperties.WAREHOUSE_LOCATION, 
"new_dlf_iceberg_catalog");
+        // remove this endpoint prop, or, add https://
+        // must set:
+        // software.amazon.awssdk.core.exception.SdkClientException: Unable to 
load region from any of the providers in
+        // the chain 
software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain@627ff1b8:
+        // 
[software.amazon.awssdk.regions.providers.SystemSettingsRegionProvider@67d32a54:
+        // Unable to load region from system settings. Region must be 
specified either via environment variable
+        // (AWS_REGION) or  system property (aws.region).,
+        // 
software.amazon.awssdk.regions.providers.AwsProfileRegionProvider@2792b416: No 
region provided in profile:
+        // default, 
software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider@5cff6b74:
+        // Unable to contact EC2 metadata service.]
+        // options.put(AwsClientProperties.CLIENT_REGION, "cn-beijing");
+        // Forbidden: {"message":"Missing Authentication Token"}
+        options.put("rest.sigv4-enabled", "true");
+        // Forbidden: {"message":"Credential should be scoped to correct 
service: 'glue'. "}
+        options.put("rest.signing-name", "DlfNext");
+        //  Forbidden: {"message":"The security token included in the request 
is invalid."}
+        options.put("rest.access-key-id", ak);
+        // Forbidden: {"message":"The request signature we calculated does not 
match the signature you provided.
+        // Check your AWS Secret Access Key and signing method. Consult the 
service documentation for details."}
+        options.put("rest.secret-access-key", sk);
+        // same as AwsClientProperties.CLIENT_REGION, "ap-east-1"
+        options.put("rest.signing-region", "cn-beijing");
+        //options.put("rest.auth.type", "sigv4");
+
+        // options.put("iceberg.catalog.warehouse", 
"<accountid>:s3tablescatalog/<table-bucket-name>");
+        // 4. Build iceberg catalog
+        Configuration conf = new Configuration();
+        return CatalogUtil.buildIcebergCatalog("dlf_test", options, conf);
+    }
+
+    /**
+     * CREATE CATALOG dlf PROPERTIES (
+     *     'type' = 'iceberg',
+     *     'warehouse' = 's3://warehouse',
+     *     'iceberg.catalog.type' = 'rest',
+     *     'iceberg.rest.uri' = 
'http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg',
+     *     'oss.endpoint' = 'https://oss-cn-beijing.aliyuncs.com',
+     *     'oss.access_key' = 'LTAI5t6wZZ7o4HThKWT2Pbxb',
+     *     'oss.secret_key' = '7REf7bRXiL9lnt5Zh4BGTncyCln2sQ',
+     *     'oss.region' = 'cn-beijing',
+     *     'iceberg.rest.sigv4-enabled' = 'true',
+     *     'iceberg.rest.signing-name' = 'DlfNext',
+     *     'iceberg.rest.signing-region' = 'cn-beijing',
+     *     'iceberg.rest.access-key-id' = 'LTAI5t6wZZ7o4HThKWT2Pbxb',
+     *     'iceberg.rest.secret-access-key' = '7REf7bRXiL9lnt5Zh4BGTncyCln2sQ',
+     *     'iceberg.rest.auth.type' = 'sigv4'
+     * );
+     */
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
index b35782b2033..25b7f3ecdda 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergJdbcMetaStorePropertiesTest.java
@@ -39,10 +39,9 @@ public class IcebergJdbcMetaStorePropertiesTest {
         jdbcProps.initNormalizeAndCheckProps();
 
         Map<String, String> catalogProps = 
jdbcProps.getIcebergJdbcCatalogProperties();
-        Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_TYPE_JDBC,
-                catalogProps.get(CatalogUtil.ICEBERG_CATALOG_TYPE));
+        Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_JDBC,
+                catalogProps.get(CatalogProperties.CATALOG_IMPL));
         Assertions.assertEquals("jdbc:mysql://localhost:3306/iceberg", 
catalogProps.get(CatalogProperties.URI));
-        Assertions.assertEquals("s3://warehouse/path", 
catalogProps.get(CatalogProperties.WAREHOUSE_LOCATION));
         Assertions.assertEquals("iceberg", catalogProps.get("jdbc.user"));
         Assertions.assertEquals("secret", catalogProps.get("jdbc.password"));
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergRestPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergRestPropertiesTest.java
index 226f6dd6551..faf6f95d3b1 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergRestPropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergRestPropertiesTest.java
@@ -17,13 +17,22 @@
 
 package org.apache.doris.datasource.property.metastore;
 
+import org.apache.doris.datasource.property.storage.OSSProperties;
+import org.apache.doris.datasource.property.storage.S3Properties;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.aws.AwsClientProperties;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
 import org.apache.iceberg.rest.auth.OAuth2Properties;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class IcebergRestPropertiesTest {
@@ -39,8 +48,8 @@ public class IcebergRestPropertiesTest {
         restProps.initNormalizeAndCheckProps();
 
         Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
-        Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST,
-                catalogProps.get(CatalogUtil.ICEBERG_CATALOG_TYPE));
+        Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_REST,
+                catalogProps.get(CatalogProperties.CATALOG_IMPL));
         Assertions.assertEquals("http://localhost:8080";, 
catalogProps.get(CatalogProperties.URI));
         Assertions.assertEquals("s3://warehouse/path", 
catalogProps.get(CatalogProperties.WAREHOUSE_LOCATION));
         Assertions.assertEquals("prefix", catalogProps.get("prefix"));
@@ -172,8 +181,8 @@ public class IcebergRestPropertiesTest {
 
         Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
         // Should only have basic properties, no OAuth2 properties
-        Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST,
-                catalogProps.get(CatalogUtil.ICEBERG_CATALOG_TYPE));
+        Assertions.assertEquals(CatalogUtil.ICEBERG_CATALOG_REST,
+                catalogProps.get(CatalogProperties.CATALOG_IMPL));
         Assertions.assertEquals("http://localhost:8080";, 
catalogProps.get(CatalogProperties.URI));
         
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.CREDENTIAL));
         
Assertions.assertFalse(catalogProps.containsKey(OAuth2Properties.TOKEN));
@@ -270,7 +279,7 @@ public class IcebergRestPropertiesTest {
         restProps.initNormalizeAndCheckProps();
 
         Map<String, String> catalogProps = 
restProps.getIcebergRestCatalogProperties();
-        Assertions.assertEquals("glue", catalogProps.get("rest.signing-name"));
+        Assertions.assertEquals("GLUE", catalogProps.get("rest.signing-name"));
         Assertions.assertEquals("us-west-2", 
catalogProps.get("rest.signing-region"));
     }
 
@@ -427,4 +436,113 @@ public class IcebergRestPropertiesTest {
                 || errorMessage.contains("secret-access-key")
                 || errorMessage.contains("sigv4-enabled"));
     }
+
+    @Test
+    public void testToFileIOPropertiesPrefersNonS3Properties() {
+        // When both S3Properties and OSSProperties exist, OSSProperties 
should be chosen
+        Map<String, String> s3Props = new HashMap<>();
+        s3Props.put("s3.endpoint", "https://s3.us-east-1.amazonaws.com";);
+        s3Props.put("s3.access_key", "s3AccessKey");
+        s3Props.put("s3.secret_key", "s3SecretKey");
+        s3Props.put("s3.region", "us-east-1");
+        s3Props.put(StorageProperties.FS_S3_SUPPORT, "true");
+        S3Properties s3 = (S3Properties) 
StorageProperties.createPrimary(s3Props);
+
+        Map<String, String> ossProps = new HashMap<>();
+        ossProps.put("oss.endpoint", "oss-cn-beijing.aliyuncs.com");
+        ossProps.put("oss.access_key", "ossAccessKey");
+        ossProps.put("oss.secret_key", "ossSecretKey");
+        ossProps.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        OSSProperties oss = (OSSProperties) 
StorageProperties.createPrimary(ossProps);
+
+        Map<String, String> restPropsMap = new HashMap<>();
+        restPropsMap.put("iceberg.rest.uri", "http://localhost:8080";);
+        IcebergRestProperties restProps = new 
IcebergRestProperties(restPropsMap);
+        restProps.initNormalizeAndCheckProps();
+
+        List<StorageProperties> storageList = new ArrayList<>();
+        storageList.add(s3);
+        storageList.add(oss);
+
+        Map<String, String> fileIOProperties = new HashMap<>();
+        Configuration conf = new Configuration();
+        restProps.toFileIOProperties(storageList, fileIOProperties, conf);
+
+        // OSSProperties should be used, not S3Properties
+        Assertions.assertEquals("oss-cn-beijing.aliyuncs.com", 
fileIOProperties.get(S3FileIOProperties.ENDPOINT));
+        Assertions.assertEquals("ossAccessKey", 
fileIOProperties.get(S3FileIOProperties.ACCESS_KEY_ID));
+        Assertions.assertEquals("ossSecretKey", 
fileIOProperties.get(S3FileIOProperties.SECRET_ACCESS_KEY));
+    }
+
+    @Test
+    public void testToFileIOPropertiesFallsBackToS3Properties() {
+        // When only S3Properties exists, it should be used
+        Map<String, String> s3Props = new HashMap<>();
+        s3Props.put("s3.endpoint", "https://s3.us-east-1.amazonaws.com";);
+        s3Props.put("s3.access_key", "s3AccessKey");
+        s3Props.put("s3.secret_key", "s3SecretKey");
+        s3Props.put("s3.region", "us-east-1");
+        s3Props.put(StorageProperties.FS_S3_SUPPORT, "true");
+        S3Properties s3 = (S3Properties) 
StorageProperties.createPrimary(s3Props);
+
+        Map<String, String> restPropsMap = new HashMap<>();
+        restPropsMap.put("iceberg.rest.uri", "http://localhost:8080";);
+        IcebergRestProperties restProps = new 
IcebergRestProperties(restPropsMap);
+        restProps.initNormalizeAndCheckProps();
+
+        List<StorageProperties> storageList = new ArrayList<>();
+        storageList.add(s3);
+
+        Map<String, String> fileIOProperties = new HashMap<>();
+        Configuration conf = new Configuration();
+        restProps.toFileIOProperties(storageList, fileIOProperties, conf);
+
+        Assertions.assertEquals("https://s3.us-east-1.amazonaws.com";, 
fileIOProperties.get(S3FileIOProperties.ENDPOINT));
+        Assertions.assertEquals("s3AccessKey", 
fileIOProperties.get(S3FileIOProperties.ACCESS_KEY_ID));
+        Assertions.assertEquals("us-east-1", 
fileIOProperties.get(AwsClientProperties.CLIENT_REGION));
+    }
+
+    @Test
+    public void testToFileIOPropertiesOnlyFirstNonS3Used() {
+        // When S3Properties comes first, then two non-S3 types, only the 
first non-S3 is used
+        Map<String, String> s3Props = new HashMap<>();
+        s3Props.put("s3.endpoint", "https://s3.amazonaws.com";);
+        s3Props.put("s3.access_key", "s3AK");
+        s3Props.put("s3.secret_key", "s3SK");
+        s3Props.put("s3.region", "us-east-1");
+        s3Props.put(StorageProperties.FS_S3_SUPPORT, "true");
+        S3Properties s3 = (S3Properties) 
StorageProperties.createPrimary(s3Props);
+
+        Map<String, String> ossProps1 = new HashMap<>();
+        ossProps1.put("oss.endpoint", "oss-cn-beijing.aliyuncs.com");
+        ossProps1.put("oss.access_key", "ossAK1");
+        ossProps1.put("oss.secret_key", "ossSK1");
+        ossProps1.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        OSSProperties oss1 = (OSSProperties) 
StorageProperties.createPrimary(ossProps1);
+
+        Map<String, String> ossProps2 = new HashMap<>();
+        ossProps2.put("oss.endpoint", "oss-cn-shanghai.aliyuncs.com");
+        ossProps2.put("oss.access_key", "ossAK2");
+        ossProps2.put("oss.secret_key", "ossSK2");
+        ossProps2.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        OSSProperties oss2 = (OSSProperties) 
StorageProperties.createPrimary(ossProps2);
+
+        Map<String, String> restPropsMap = new HashMap<>();
+        restPropsMap.put("iceberg.rest.uri", "http://localhost:8080";);
+        IcebergRestProperties restProps = new 
IcebergRestProperties(restPropsMap);
+        restProps.initNormalizeAndCheckProps();
+
+        List<StorageProperties> storageList = new ArrayList<>();
+        storageList.add(s3);
+        storageList.add(oss1);
+        storageList.add(oss2);
+
+        Map<String, String> fileIOProperties = new HashMap<>();
+        Configuration conf = new Configuration();
+        restProps.toFileIOProperties(storageList, fileIOProperties, conf);
+
+        // First non-S3Properties (oss1) should be used
+        Assertions.assertEquals("oss-cn-beijing.aliyuncs.com", 
fileIOProperties.get(S3FileIOProperties.ENDPOINT));
+        Assertions.assertEquals("ossAK1", 
fileIOProperties.get(S3FileIOProperties.ACCESS_KEY_ID));
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergUnityCatalogRestCatalogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergUnityCatalogRestCatalogTest.java
index a60074a807c..88862355c0a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergUnityCatalogRestCatalogTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/IcebergUnityCatalogRestCatalogTest.java
@@ -104,7 +104,6 @@ public class IcebergUnityCatalogRestCatalogTest {
         return CatalogUtil.buildIcebergCatalog("databricks_test", options, 
conf);
     }
 
-
     @Test
     public void rawTest() {
         Map<String, String> options = Maps.newHashMap();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
index de919e723c5..ef0d5b46e24 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java
@@ -56,7 +56,7 @@ public class COSPropertiesTest {
         origProps.put("test_non_storage_param", "6000");
         Assertions.assertDoesNotThrow(() -> 
StorageProperties.createAll(origProps));
         origProps.put("cos.endpoint", "cos.ap-beijing-1.myqcloud.com");
-        COSProperties cosProperties = (COSProperties) 
StorageProperties.createAll(origProps).get(1);
+        COSProperties cosProperties = (COSProperties) 
StorageProperties.createAll(origProps).get(0);
         Map<String, String> cosConfig = cosProperties.getMatchedProperties();
         
Assertions.assertTrue(!cosConfig.containsKey("test_non_storage_param"));
 
@@ -90,7 +90,7 @@ public class COSPropertiesTest {
         origProps.put(StorageProperties.FS_COS_SUPPORT, "true");
         //origProps.put("cos.region", "ap-beijing");
 
-        COSProperties cosProperties = (COSProperties) 
StorageProperties.createAll(origProps).get(1);
+        COSProperties cosProperties = (COSProperties) 
StorageProperties.createAll(origProps).get(0);
         Map<String, String> s3Props = 
cosProperties.generateBackendS3Configuration();
         Map<String, String> cosConfig = cosProperties.getMatchedProperties();
         
Assertions.assertTrue(!cosConfig.containsKey("test_non_storage_param"));
@@ -110,8 +110,8 @@ public class COSPropertiesTest {
         Assertions.assertEquals("1000", 
s3Props.get("AWS_CONNECTION_TIMEOUT_MS"));
         Assertions.assertEquals("false", s3Props.get("use_path_style"));
         origProps.put("cos.use_path_style", "true");
-        cosProperties = (COSProperties) 
StorageProperties.createAll(origProps).get(1);
-        Assertions.assertEquals(HdfsProperties.class, 
StorageProperties.createAll(origProps).get(0).getClass());
+        cosProperties = (COSProperties) 
StorageProperties.createAll(origProps).get(0);
+        Assertions.assertEquals(1, 
StorageProperties.createAll(origProps).size());
         s3Props = cosProperties.generateBackendS3Configuration();
         Assertions.assertEquals("true", s3Props.get("use_path_style"));
         // Add any additional assertions for other properties if needed
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
index f08168965d2..620845a9bea 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java
@@ -70,8 +70,8 @@ public class OBSPropertyTest {
         origProps.put("obs.use_path_style", "true");
         origProps.put("test_non_storage_param", "test_non_storage_value");
         origProps.put(StorageProperties.FS_OBS_SUPPORT, "true");
-        OBSProperties obsProperties = (OBSProperties) 
StorageProperties.createAll(origProps).get(1);
-        Assertions.assertEquals(HdfsProperties.class, 
StorageProperties.createAll(origProps).get(0).getClass());
+        OBSProperties obsProperties = (OBSProperties) 
StorageProperties.createAll(origProps).get(0);
+        Assertions.assertEquals(1, 
StorageProperties.createAll(origProps).size());
         Map<String, String> s3Props = new HashMap<>();
         Map<String, String> obsConfig = obsProperties.getMatchedProperties();
         
Assertions.assertTrue(!obsConfig.containsKey("test_non_storage_param"));
@@ -92,12 +92,11 @@ public class OBSPropertyTest {
         Assertions.assertEquals("1000", 
s3Props.get("AWS_CONNECTION_TIMEOUT_MS"));
         Assertions.assertEquals("true", s3Props.get("use_path_style"));
         origProps.remove("obs.use_path_style");
-        obsProperties = (OBSProperties) 
StorageProperties.createAll(origProps).get(1);
+        obsProperties = (OBSProperties) 
StorageProperties.createAll(origProps).get(0);
         s3Props = obsProperties.getBackendConfigProperties();
         Assertions.assertEquals("false", s3Props.get("use_path_style"));
     }
 
-
     @Test
     public void testGetRegion() throws UserException {
         origProps.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com");
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
index 912f20b3144..fce9934cdaa 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java
@@ -70,8 +70,7 @@ public class OSSPropertiesTest {
         origProps.put("oss.connection.timeout", "1000");
         origProps.put("oss.use_path_style", "true");
         origProps.put("test_non_storage_param", "6000");
-        OSSProperties ossProperties = (OSSProperties) 
StorageProperties.createAll(origProps).get(1);
-        Assertions.assertEquals(HdfsProperties.class, 
StorageProperties.createAll(origProps).get(0).getClass());
+        OSSProperties ossProperties = (OSSProperties) 
StorageProperties.createAll(origProps).get(0);
         Map<String, String> s3Props;
 
         Map<String, String> ossConfig = ossProperties.getMatchedProperties();
@@ -94,7 +93,7 @@ public class OSSPropertiesTest {
         Assertions.assertEquals("1000", 
s3Props.get("AWS_CONNECTION_TIMEOUT_MS"));
         Assertions.assertEquals("true", s3Props.get("use_path_style"));
         origProps.remove("oss.use_path_style");
-        ossProperties = (OSSProperties) 
StorageProperties.createAll(origProps).get(1);
+        ossProperties = (OSSProperties) 
StorageProperties.createAll(origProps).get(0);
         s3Props = ossProperties.generateBackendS3Configuration();
         Assertions.assertEquals("false", s3Props.get("use_path_style"));
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OzonePropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OzonePropertiesTest.java
index fe6701f9163..42fc24c3209 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OzonePropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OzonePropertiesTest.java
@@ -107,8 +107,7 @@ public class OzonePropertiesTest {
         origProps.put("use_path_style", "true");
 
         List<StorageProperties> properties = 
StorageProperties.createAll(origProps);
-        Assertions.assertEquals(HdfsProperties.class, 
properties.get(0).getClass());
-        Assertions.assertEquals(OzoneProperties.class, 
properties.get(1).getClass());
+        Assertions.assertEquals(OzoneProperties.class, 
properties.get(0).getClass());
 
         Map<StorageProperties.Type, StorageProperties> propertiesMap = 
properties.stream()
                 .collect(Collectors.toMap(StorageProperties::getType, 
Function.identity()));
@@ -127,10 +126,9 @@ public class OzonePropertiesTest {
         origProps.put("ozone.region", "us-east-1");
 
         List<StorageProperties> properties = 
StorageProperties.createAll(origProps);
-        Assertions.assertEquals(HdfsProperties.class, 
properties.get(0).getClass());
-        Assertions.assertEquals(OzoneProperties.class, 
properties.get(1).getClass());
+        Assertions.assertEquals(OzoneProperties.class, 
properties.get(0).getClass());
 
-        OzoneProperties ozoneProperties = (OzoneProperties) properties.get(1);
+        OzoneProperties ozoneProperties = (OzoneProperties) properties.get(0);
         Assertions.assertEquals("hadoop", 
ozoneProperties.getHadoopStorageConfig().get("fs.s3a.access.key"));
         Assertions.assertEquals("hadoop", 
ozoneProperties.getHadoopStorageConfig().get("fs.s3a.secret.key"));
         Assertions.assertEquals("http://ozone-s3g:9878";, 
ozoneProperties.getHadoopStorageConfig().get("fs.s3a.endpoint"));
@@ -176,8 +174,7 @@ public class OzonePropertiesTest {
 
         origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true");
         List<StorageProperties> propertiesWithFlag = 
StorageProperties.createAll(origProps);
-        Assertions.assertEquals(2, propertiesWithFlag.size());
-        Assertions.assertEquals(HdfsProperties.class, 
propertiesWithFlag.get(0).getClass());
-        Assertions.assertEquals(OzoneProperties.class, 
propertiesWithFlag.get(1).getClass());
+        Assertions.assertEquals(1, propertiesWithFlag.size());
+        Assertions.assertEquals(OzoneProperties.class, 
propertiesWithFlag.get(0).getClass());
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
index aa1c95b433d..d8b1338fd7c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java
@@ -130,8 +130,8 @@ public class S3PropertiesTest {
         origProps.put("s3.connection.timeout", "6000");
         origProps.put("test_non_storage_param", "6000");
         origProps.put("s3.endpoint", "s3.us-west-1.amazonaws.com");
-        S3Properties s3Properties = (S3Properties) 
StorageProperties.createAll(origProps).get(1);
-        Assertions.assertEquals(HdfsProperties.class, 
StorageProperties.createAll(origProps).get(0).getClass());
+        S3Properties s3Properties = (S3Properties) 
StorageProperties.createAll(origProps).get(0);
+        Assertions.assertEquals(1, 
StorageProperties.createAll(origProps).size());
         Map<String, String> s3Props = 
s3Properties.getBackendConfigProperties();
         Map<String, String> s3Config = s3Properties.getMatchedProperties();
         Assertions.assertTrue(!s3Config.containsKey("test_non_storage_param"));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/StoragePropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/StoragePropertiesTest.java
new file mode 100644
index 00000000000..022c1e9b890
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/StoragePropertiesTest.java
@@ -0,0 +1,421 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Unit tests for {@link StorageProperties}, focusing on the explicit {@code 
fs.xx.support}
+ * flag behavior that controls whether heuristic-based {@code guessIsMe} 
detection is used.
+ *
+ * <p>Background: Each storage provider uses an OR logic of
+ * {@code isFsSupport(props, FS_XX_SUPPORT) || XXProperties.guessIsMe(props)} 
to detect
+ * whether it should be activated. The {@code guessIsMe} heuristic inspects 
endpoint strings
+ * to infer the provider type. However, some endpoints are ambiguous — for 
example,
+ * {@code aliyuncs.com} can trigger both OSS and S3 guessIsMe, and {@code 
amazonaws.com}
+ * can trigger S3 even when the user only intended OSS.
+ *
+ * <p>The fix: when any {@code fs.xx.support=true} is explicitly set, all 
providers skip
+ * {@code guessIsMe} and rely solely on their respective {@code fs.xx.support} 
flags.
+ * This prevents cross-provider false-positive matches.
+ */
+public class StoragePropertiesTest {
+
+    // 
========================================================================================
+    // 1. Backward compatibility: no explicit fs.xx.support → guessIsMe still 
works
+    // 
========================================================================================
+
+    /**
+     * When no {@code fs.xx.support} flag is set, providers should fall back to
+     * {@code guessIsMe} heuristic. An OSS endpoint containing "aliyuncs.com"
+     * should be detected as OSS via guessIsMe.
+     */
+    @Test
+    public void testNoExplicitSupport_guessIsMeStillWorks_OSS() throws 
UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
+        props.put("oss.access_key", "ak");
+        props.put("oss.secret_key", "sk");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+        List<Class<?>> types = toTypeList(all);
+
+        // guessIsMe should detect OSS; default HDFS is always prepended
+        Assertions.assertTrue(types.contains(HdfsProperties.class),
+                "Default HDFS should always be present");
+        Assertions.assertTrue(types.contains(OSSProperties.class),
+                "OSS should be detected via guessIsMe when no explicit 
fs.xx.support is set");
+    }
+
+    /**
+     * When no {@code fs.xx.support} flag is set, an S3 endpoint containing
+     * "amazonaws.com" should be detected as S3 via guessIsMe.
+     */
+    @Test
+    public void testNoExplicitSupport_guessIsMeStillWorks_S3() throws 
UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put("s3.endpoint", "s3.us-east-1.amazonaws.com");
+        props.put("s3.access_key", "ak");
+        props.put("s3.secret_key", "sk");
+        props.put("s3.region", "us-east-1");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+        List<Class<?>> types = toTypeList(all);
+
+        Assertions.assertTrue(types.contains(S3Properties.class),
+                "S3 should be detected via guessIsMe when no explicit 
fs.xx.support is set");
+    }
+
+    // 
========================================================================================
+    // 2. Core scenario: explicit fs.oss.support=true → S3 guessIsMe is skipped
+    // 
========================================================================================
+
+    /**
+     * When {@code fs.oss.support=true} is explicitly set, even if the 
properties also
+     * contain an S3 endpoint ({@code s3.amazonaws.com}) that would normally 
trigger
+     * {@code S3Properties.guessIsMe}, S3 should NOT be matched.
+     * Only OSS should appear — the default HDFS fallback is also skipped in 
explicit mode.
+     *
+     * <p>This is the primary scenario that motivated the fix: a user 
configuring an
+     * Aliyun DLF Iceberg catalog with OSS storage, where the endpoint string
+     * accidentally matches S3's guessIsMe heuristic.
+     */
+    @Test
+    public void testExplicitOssSupport_skipsS3GuessIsMe() throws UserException 
{
+        Map<String, String> props = new HashMap<>();
+        props.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        props.put("oss.endpoint", "oss-cn-beijing-internal.aliyuncs.com");
+        props.put("oss.access_key", "ak");
+        props.put("oss.secret_key", "sk");
+        // This endpoint would normally trigger S3Properties.guessIsMe
+        props.put("s3.endpoint", "s3.amazonaws.com");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+        List<Class<?>> types = toTypeList(all);
+
+        Assertions.assertEquals(1, all.size(),
+                "Should only contain OSS — no default HDFS in explicit mode");
+        Assertions.assertTrue(types.contains(OSSProperties.class));
+        Assertions.assertFalse(types.contains(S3Properties.class),
+                "S3 should NOT be matched when fs.oss.support is explicitly 
set");
+        Assertions.assertFalse(types.contains(HdfsProperties.class),
+                "Default HDFS should NOT be added in explicit mode");
+    }
+
+    // 
========================================================================================
+    // 3. Symmetric scenario: explicit fs.s3.support=true → OSS guessIsMe is 
skipped
+    // 
========================================================================================
+
+    /**
+     * When {@code fs.s3.support=true} is explicitly set, even if the 
properties also
+     * contain an OSS endpoint ({@code aliyuncs.com}) that would normally 
trigger
+     * {@code OSSProperties.guessIsMe}, OSS should NOT be matched.
+     * Default HDFS fallback is also skipped in explicit mode.
+     */
+    @Test
+    public void testExplicitS3Support_skipsOssGuessIsMe() throws UserException 
{
+        Map<String, String> props = new HashMap<>();
+        props.put(StorageProperties.FS_S3_SUPPORT, "true");
+        props.put("s3.endpoint", "s3.us-east-1.amazonaws.com");
+        props.put("s3.access_key", "ak");
+        props.put("s3.secret_key", "sk");
+        props.put("s3.region", "us-east-1");
+        // This endpoint would normally trigger OSSProperties.guessIsMe
+        props.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+        List<Class<?>> types = toTypeList(all);
+
+        Assertions.assertEquals(1, all.size(),
+                "Should only contain S3 — no default HDFS in explicit mode");
+        Assertions.assertTrue(types.contains(S3Properties.class),
+                "S3 should be matched via explicit fs.s3.support");
+        Assertions.assertFalse(types.contains(OSSProperties.class),
+                "OSS should NOT be matched when fs.s3.support is explicitly 
set");
+        Assertions.assertFalse(types.contains(HdfsProperties.class),
+                "Default HDFS should NOT be added in explicit mode");
+    }
+
+    // 
========================================================================================
+    // 4. Multiple explicit flags: only flagged providers are matched
+    // 
========================================================================================
+
+    /**
+     * When multiple {@code fs.xx.support=true} flags are set (e.g., OSS + S3),
+     * both should be matched, but other providers whose guessIsMe might fire
+     * (e.g., COS via myqcloud.com endpoint) should NOT be matched.
+     * Default HDFS fallback is also skipped in explicit mode.
+     */
+    @Test
+    public void testMultipleExplicitSupport_onlyFlaggedProvidersMatched() 
throws UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        props.put(StorageProperties.FS_S3_SUPPORT, "true");
+        props.put("oss.endpoint", "oss-cn-beijing.aliyuncs.com");
+        props.put("oss.access_key", "ak");
+        props.put("oss.secret_key", "sk");
+        props.put("s3.endpoint", "s3.us-east-1.amazonaws.com");
+        props.put("s3.access_key", "ak");
+        props.put("s3.secret_key", "sk");
+        props.put("s3.region", "us-east-1");
+        // COS endpoint that would trigger COSProperties.guessIsMe
+        props.put("cos.endpoint", "cos.ap-guangzhou.myqcloud.com");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+        List<Class<?>> types = toTypeList(all);
+
+        Assertions.assertEquals(2, all.size(),
+                "Should only contain OSS + S3, no default HDFS and no COS");
+        Assertions.assertTrue(types.contains(OSSProperties.class),
+                "OSS should be matched via explicit flag");
+        Assertions.assertTrue(types.contains(S3Properties.class),
+                "S3 should be matched via explicit flag");
+        Assertions.assertFalse(types.contains(COSProperties.class),
+                "COS should NOT be matched — no fs.cos.support=true, and 
guessIsMe is disabled");
+        Assertions.assertFalse(types.contains(HdfsProperties.class),
+                "Default HDFS should NOT be added in explicit mode");
+    }
+
+    // 
========================================================================================
+    // 5. createPrimary also respects the explicit support logic
+    // 
========================================================================================
+
+    /**
+     * {@code createPrimary} should also skip guessIsMe when an explicit flag 
is set.
+     * With {@code fs.s3.support=true}, even if OSS endpoint is present and 
would
+     * match via guessIsMe, createPrimary should not return OSS.
+     */
+    @Test
+    public void testCreatePrimary_explicitSupport_skipsGuessIsMe() {
+        Map<String, String> props = new HashMap<>();
+        props.put(StorageProperties.FS_S3_SUPPORT, "true");
+        props.put("s3.endpoint", "s3.us-east-1.amazonaws.com");
+        props.put("s3.access_key", "ak");
+        props.put("s3.secret_key", "sk");
+        props.put("s3.region", "us-east-1");
+        // OSS endpoint that would normally trigger OSSProperties.guessIsMe
+        props.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
+
+        // HDFS is first in PROVIDERS list, but without fs.hdfs.support and 
guessIsMe disabled,
+        // it won't match. S3 should be the first match via its explicit flag.
+        StorageProperties primary = StorageProperties.createPrimary(props);
+        Assertions.assertInstanceOf(S3Properties.class, primary,
+                "createPrimary should return S3 (first explicit match), not 
OSS via guessIsMe");
+    }
+
+    // 
========================================================================================
+    // 6. Default HDFS fallback is skipped when explicit support is set
+    // 
========================================================================================
+
+    /**
+     * When {@code fs.oss.support=true} is set and no HDFS-specific properties 
exist,
+     * the default HDFS fallback should NOT be added. In explicit mode, only 
the
+     * providers with matching {@code fs.xx.support=true} flags are included.
+     */
+    @Test
+    public void testExplicitSupport_noDefaultHdfsFallback() throws 
UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        props.put("oss.endpoint", "oss-cn-beijing.aliyuncs.com");
+        props.put("oss.access_key", "ak");
+        props.put("oss.secret_key", "sk");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+
+        Assertions.assertEquals(1, all.size(),
+                "Should only contain OSS, no default HDFS in explicit mode");
+        Assertions.assertInstanceOf(OSSProperties.class, all.get(0));
+    }
+
+    /**
+     * When {@code fs.hdfs.support=true} is explicitly set alongside another 
provider,
+     * HDFS should appear because it was explicitly requested, not as a 
default fallback.
+     */
+    @Test
+    public void testExplicitHdfsSupport_hdfsIncluded() throws UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put(StorageProperties.FS_HDFS_SUPPORT, "true");
+        props.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        props.put("oss.endpoint", "oss-cn-beijing.aliyuncs.com");
+        props.put("oss.access_key", "ak");
+        props.put("oss.secret_key", "sk");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+        List<Class<?>> types = toTypeList(all);
+
+        Assertions.assertEquals(2, all.size(),
+                "Should contain HDFS (explicit) + OSS");
+        Assertions.assertTrue(types.contains(HdfsProperties.class),
+                "HDFS should be present because fs.hdfs.support=true was 
explicitly set");
+        Assertions.assertTrue(types.contains(OSSProperties.class));
+    }
+
+    // 
========================================================================================
+    // 7. Edge case: fs.xx.support=false does NOT count as explicit
+    // 
========================================================================================
+
+    /**
+     * Setting {@code fs.oss.support=false} should NOT be treated as an 
explicit flag.
+     * guessIsMe should still be active in this case.
+     */
+    @Test
+    public void testFsSupportFalse_doesNotDisableGuessIsMe() throws 
UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put(StorageProperties.FS_OSS_SUPPORT, "false");
+        props.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
+        props.put("oss.access_key", "ak");
+        props.put("oss.secret_key", "sk");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+        List<Class<?>> types = toTypeList(all);
+
+        // fs.oss.support=false means no explicit flag is truly set,
+        // so guessIsMe should still work and detect OSS via endpoint
+        Assertions.assertTrue(types.contains(OSSProperties.class),
+                "OSS should still be detected via guessIsMe when 
fs.oss.support=false");
+    }
+
+    // 
========================================================================================
+    // 8. Real-world DLF scenario: OSS explicit + aliyuncs.com endpoint
+    // 
========================================================================================
+
+    /**
+     * Simulates a real-world Aliyun DLF Iceberg catalog scenario:
+     * the user sets {@code fs.oss.support=true} and provides an OSS endpoint
+     * containing "aliyuncs.com". Without the fix, "aliyuncs.com" could also
+     * match other providers' guessIsMe (since some providers check s3.endpoint
+     * which might be set to an aliyuncs.com URL). With the fix, only OSS is 
matched,
+     * and the default HDFS fallback is also skipped.
+     */
+    @Test
+    public void testDlfIcebergScenario_explicitOss_noFalsePositives() throws 
UserException {
+        Map<String, String> props = new HashMap<>();
+        // User explicitly declares OSS storage
+        props.put(StorageProperties.FS_OSS_SUPPORT, "true");
+        props.put("oss.endpoint", "oss-cn-beijing-internal.aliyuncs.com");
+        props.put("oss.access_key", "ak");
+        props.put("oss.secret_key", "sk");
+        props.put("oss.region", "cn-beijing");
+        // DLF catalog properties that might contain ambiguous endpoints
+        props.put("type", "iceberg");
+        props.put("iceberg.catalog.type", "rest");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+        List<Class<?>> types = toTypeList(all);
+
+        // Only OSS should be present — no default HDFS, no false positives
+        Assertions.assertEquals(1, all.size(),
+                "DLF scenario: should only have OSS, no default HDFS and no 
false positives");
+        Assertions.assertTrue(types.contains(OSSProperties.class));
+        Assertions.assertFalse(types.contains(HdfsProperties.class),
+                "Default HDFS should not appear in explicit mode");
+        Assertions.assertFalse(types.contains(S3Properties.class),
+                "S3 should not appear in DLF OSS scenario");
+    }
+
+    /**
+     * Setting {@code fs.oss.support=false} should NOT be treated as an 
explicit flag.
+     * guessIsMe should still be active in this case.
+     */
+    @Test
+    public void testMinioFixed_explicitMinio() throws UserException {
+        Map<String, String> props = new HashMap<>();
+        props.put(StorageProperties.FS_MINIO_SUPPORT, "true");
+        props.put("minio.endpoint", "htpp://minio.br88.9000");
+        props.put("minio.access_key", "ak");
+        props.put("minio.secret_key", "sk");
+        props.put("s3.region", "minio");
+
+        List<StorageProperties> all = StorageProperties.createAll(props);
+        List<Class<?>> types = toTypeList(all);
+
+        // fs.oss.support=false means no explicit flag is truly set,
+        // so guessIsMe should still work and detect OSS via endpoint
+        Assertions.assertTrue(types.contains(MinioProperties.class),
+                "Minio should be created when fs.minio.support=true");
+    }
+
+    // 
========================================================================================
+    // Helper
+    // 
========================================================================================
+
+    private static List<Class<?>> toTypeList(List<StorageProperties> all) {
+        return all.stream()
+                .map(Object::getClass)
+                .collect(Collectors.toList());
+    }
+
+    // 
========================================================================================
+    // 10. getRegionFromProperties tests
+    // 
========================================================================================
+
+    @Test
+    public void testGetRegionFromProperties_s3Region() {
+        Map<String, String> props = new HashMap<>();
+        props.put("s3.region", "us-west-2");
+        String region = 
AbstractS3CompatibleProperties.getRegionFromProperties(props);
+        Assertions.assertEquals("us-west-2", region);
+    }
+
+    @Test
+    public void testGetRegionFromProperties_ossRegion() {
+        Map<String, String> props = new HashMap<>();
+        props.put("oss.region", "cn-hangzhou");
+        String region = 
AbstractS3CompatibleProperties.getRegionFromProperties(props);
+        Assertions.assertEquals("cn-hangzhou", region);
+    }
+
+    @Test
+    public void testGetRegionFromProperties_awsRegion() {
+        Map<String, String> props = new HashMap<>();
+        props.put("AWS_REGION", "eu-west-1");
+        String region = 
AbstractS3CompatibleProperties.getRegionFromProperties(props);
+        Assertions.assertEquals("eu-west-1", region);
+    }
+
+    @Test
+    public void testGetRegionFromProperties_cosRegion() {
+        Map<String, String> props = new HashMap<>();
+        props.put("cos.region", "ap-guangzhou");
+        String region = 
AbstractS3CompatibleProperties.getRegionFromProperties(props);
+        Assertions.assertEquals("ap-guangzhou", region);
+    }
+
+    @Test
+    public void testGetRegionFromProperties_noRegion() {
+        Map<String, String> props = new HashMap<>();
+        props.put("s3.endpoint", "s3.us-east-1.amazonaws.com");
+        String region = 
AbstractS3CompatibleProperties.getRegionFromProperties(props);
+        Assertions.assertNull(region);
+    }
+
+    @Test
+    public void testGetRegionFromProperties_emptyProps() {
+        Map<String, String> props = new HashMap<>();
+        String region = 
AbstractS3CompatibleProperties.getRegionFromProperties(props);
+        Assertions.assertNull(region);
+    }
+}
diff --git a/regression-test/conf/regression-conf.groovy 
b/regression-test/conf/regression-conf.groovy
index 79b4e5c20bc..570ec50b798 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -331,3 +331,5 @@ hudiHmsPort=19083
 hudiMinioPort=19100
 hudiMinioAccessKey="minio"
 hudiMinioSecretKey="minio123"
+
+icebergDlfRestCatalog="'type' = 'iceberg', 'warehouse' = 
'new_dlf_iceberg_catalog', 'iceberg.catalog.type' = 'rest', 'iceberg.rest.uri' 
= 'http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg', 
'iceberg.rest.sigv4-enabled' = 'true', 'iceberg.rest.signing-name' = 'DlfNext', 
'iceberg.rest.access-key-id' = 'ak', 'iceberg.rest.secret-access-key' = 'sk', 
'iceberg.rest.signing-region' = 'cn-beijing', 
'iceberg.rest.vended-credentials-enabled' = 'true', 'io-impl' = 
'org.apache.iceberg.rest.DlfFileIO', [...]
diff --git 
a/regression-test/data/external_table_p2/iceberg/test_iceberg_dlf_rest_catalog.out
 
b/regression-test/data/external_table_p2/iceberg/test_iceberg_dlf_rest_catalog.out
new file mode 100644
index 00000000000..ed576ce0b69
--- /dev/null
+++ 
b/regression-test/data/external_table_p2/iceberg/test_iceberg_dlf_rest_catalog.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !c1 --
+1      abc
+2      def
+
+-- !c2 --
+PARQUET
+
+-- !c3 --
+1      abc
+2      def
+
+-- !c4 --
+PARQUET
+
+-- !c5 --
+
diff --git 
a/regression-test/suites/external_table_p2/iceberg/test_iceberg_dlf_rest_catalog.groovy
 
b/regression-test/suites/external_table_p2/iceberg/test_iceberg_dlf_rest_catalog.groovy
new file mode 100644
index 00000000000..69550a34d62
--- /dev/null
+++ 
b/regression-test/suites/external_table_p2/iceberg/test_iceberg_dlf_rest_catalog.groovy
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_dlf_rest_catalog", 
"p2,external,iceberg,external_remote,external_remote_iceberg") {
+    String catalog = "test_iceberg_dlf_rest_catalog"
+    String prop= context.config.otherConfigs.get("icebergDlfRestCatalog")
+
+    sql """drop catalog if exists ${catalog};"""
+    sql """
+        create catalog if not exists ${catalog} properties (
+            ${prop}
+        );
+    """
+
+    sql """ use ${catalog}.test_iceberg_db"""
+
+    qt_c1 """ select * from test_iceberg_table order by k1 """
+    qt_c2 """  select file_format from 
test_iceberg_db.test_iceberg_table\$files;"""
+
+    def uuid = UUID.randomUUID().toString().replace("-", "").substring(0, 8)
+    sql """drop database if exists dlf_iceberg_db_${uuid} force"""
+    sql """create database dlf_iceberg_db_${uuid}"""
+    sql """use dlf_iceberg_db_${uuid}"""
+    sql """create table dlf_iceberg_test_tbl (k1 int, k2 string);"""
+    sql """insert into dlf_iceberg_test_tbl values(1, 'abc'),(2, 'def');"""
+    qt_c3 """select * from dlf_iceberg_test_tbl order by k1"""
+    qt_c4 """select file_format from dlf_iceberg_test_tbl\$files"""
+    sql """drop table dlf_iceberg_test_tbl"""
+    qt_c5 """show tables"""
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to