morningman commented on code in PR #52084:
URL: https://github.com/apache/doris/pull/52084#discussion_r2166041680


##########
fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum SchemaTypeMapper {
+
+    S3("s3", StorageProperties.Type.S3, FileSystemType.S3, TFileType.FILE_S3),
+    S3A("s3a", StorageProperties.Type.S3, FileSystemType.S3, 
TFileType.FILE_S3),
+    S3N("s3n", StorageProperties.Type.S3, FileSystemType.S3, 
TFileType.FILE_S3),
+    COSN("cosn", StorageProperties.Type.COS, FileSystemType.S3, 
TFileType.FILE_S3),
+    OFS("ofs", StorageProperties.Type.BROKER, FileSystemType.OFS, 
TFileType.FILE_BROKER),
+    GFS("gfs", StorageProperties.Type.BROKER, FileSystemType.HDFS, 
TFileType.FILE_BROKER),
+    JFS("jfs", StorageProperties.Type.BROKER, FileSystemType.JFS, 
TFileType.FILE_BROKER),
+    VIEWFS("viewfs", StorageProperties.Type.HDFS, FileSystemType.HDFS, 
TFileType.FILE_HDFS),
+    //LAKEFS("lakefs", StorageProperties.Type.LAKEFS),
+    //GCS("gs", StorageProperties.Type.S3),
+    //BOS("bos", StorageProperties.Type.BOS),
+    FILE("file", StorageProperties.Type.LOCAL, FileSystemType.FILE, 
TFileType.FILE_LOCAL),
+
+    OSS("oss", StorageProperties.Type.OSS, FileSystemType.S3, 
TFileType.FILE_S3),
+    OBS("obs", StorageProperties.Type.OBS, FileSystemType.S3, 
TFileType.FILE_S3),
+    COS("cos", StorageProperties.Type.COS, FileSystemType.S3, 
TFileType.FILE_S3),
+    //MINIO("minio", StorageProperties.Type.MINIO),
+    /*
+     * Only secure protocols are supported to ensure safe access to Azure 
storage services.
+     * This implementation allows only "abfss" and "wasbs" schemes, which 
operate over HTTPS.
+     * Insecure or deprecated schemes such as "abfs", "wasb", and "adl" are 
explicitly unsupported.
+     * */
+    ABFSS("abfss", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
+    WASBS("wasbs", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
+    AZURE("azure", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
+    HDFS("hdfs", StorageProperties.Type.HDFS, FileSystemType.HDFS, 
TFileType.FILE_HDFS),
+    LOCAL("local", StorageProperties.Type.HDFS, FileSystemType.HDFS, 
TFileType.FILE_HDFS);
+    private final String schema;
+    private final StorageProperties.Type storageType;
+    private final FileSystemType fileSystemType;
+    private final TFileType fileType;
+
+    SchemaTypeMapper(String schema, StorageProperties.Type storageType, 
FileSystemType fileSystemType,
+                     TFileType fileType) {
+        this.schema = schema;
+        this.storageType = storageType;
+        this.fileSystemType = fileSystemType;
+        this.fileType = fileType;
+    }
+
+
+    private static final Map<String, StorageProperties.Type> 
SCHEMA_TO_TYPE_MAP = new HashMap<>();
+
+    static {
+        for (SchemaTypeMapper mapper : values()) {
+            SCHEMA_TO_TYPE_MAP.put(mapper.schema.toLowerCase(), 
mapper.storageType);
+        }
+    }
+
+    private static final Map<String, FileSystemType> SCHEMA_TO_FS_TYPE_MAP = 
new HashMap<>();
+
+    static {
+        for (SchemaTypeMapper mapper : values()) {
+            SCHEMA_TO_FS_TYPE_MAP.put(mapper.schema.toLowerCase(), 
mapper.fileSystemType);
+        }
+    }
+
+    private static final Map<String, TFileType> SCHEMA_TO_FILE_TYPE_MAP = new 
HashMap<>();
+
+    static {
+        for (SchemaTypeMapper mapper : values()) {
+            SCHEMA_TO_FILE_TYPE_MAP.put(mapper.schema.toLowerCase(), 
mapper.fileType);
+        }
+    }
+
+    /*
+     * Compatibility note:
+     * When processing HDFS paths, if the URI lacks a schema (protocol),
+     * it is assumed to be of "hdfs" type by default. This is a compatibility 
sacrifice
+     * made to support legacy behaviors.
+     *
+     * Legacy systems often omitted the schema in HDFS paths, e.g. 
"/user/hadoop/data"
+     * instead of "hdfs:///user/hadoop/data". To avoid breaking existing code,
+     * this default assumption is applied for smoother compatibility and 
migration.
+     */
+    public static StorageProperties.Type fromSchema(String schema) {
+        if (StringUtils.isBlank(schema)) {
+            return StorageProperties.Type.HDFS;
+        }
+        return SCHEMA_TO_TYPE_MAP.get(schema.toLowerCase());
+    }
+
+    public static FileSystemType fromSchemaToFileSystemType(String schema) {
+        if (schema == null) {

Review Comment:
   Unify the logic to use `StringUtils.isBlank(schema)`, same as `public static 
StorageProperties.Type fromSchema(String schema)`



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java:
##########
@@ -60,7 +60,7 @@ public FileSplit(LocationPath path, long start, long length, 
long fileLength,
         this.modificationTime = modificationTime < 0 ? 0 : modificationTime;
         this.hosts = hosts == null ? new String[0] : hosts;
         this.partitionValues = partitionValues;
-        this.locationType = path.isBindBroker() ?  TFileType.FILE_BROKER : 
path.getTFileTypeForBE();
+        this.locationType = path.getTFileTypeForBE();

Review Comment:
   How to handle broker now?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java:
##########
@@ -345,20 +338,18 @@ private Map<PartitionCacheKey, HivePartition> 
loadPartitions(Iterable<? extends
     }
 
     // Get File Status by using FileSystem API.
-    private FileCacheValue getFileCache(String location, String inputFormat,
-            JobConf jobConf,
-            List<String> partitionValues,
-            String bindBrokerName,
-            DirectoryLister directoryLister,
-            TableIf table) throws UserException {
+    private FileCacheValue getFileCache(LocationPath path, String inputFormat,
+                                        List<String> partitionValues,
+                                        DirectoryLister directoryLister,
+                                        TableIf table) throws UserException {
         FileCacheValue result = new FileCacheValue();
-        Map<String, String> properties = 
catalog.getCatalogProperty().getProperties();
-        RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                new 
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
-                        location, properties, bindBrokerName),
-                        properties,
-                        bindBrokerName, jobConf));
-        result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
+
+        FileSystemCache.FileSystemCacheKey fileSystemCacheKey = new 
FileSystemCache.FileSystemCacheKey(
+                path.getFsIdentifier(), path.getStorageProperties()

Review Comment:
   I think this is not a good idea to get storage properties in `LocationPath`,
   And you also save the storage properties in `FileSystemCacheKey`, which may 
led to some problem



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java:
##########
@@ -338,137 +146,161 @@ private static String parseScheme(String finalLocation) 
{
         return scheme;
     }
 
-    private boolean useS3EndPoint(Map<String, String> props) {
-        if (props.containsKey(ObsProperties.ENDPOINT)
-                || props.containsKey(OssProperties.ENDPOINT)
-                || props.containsKey(CosProperties.ENDPOINT)) {
-            return false;
+    /**
+     * Static factory method to create a LocationPath2 instance.
+     *
+     * @param location             the input URI location string
+     * @param storagePropertiesMap map of schema type to corresponding storage 
properties
+     * @return a new LocationPath2 instance
+     * @throws UserException if validation fails or required data is missing
+     */
+    public static LocationPath of(String location,
+                                  Map<StorageProperties.Type, 
StorageProperties> storagePropertiesMap,
+                                  boolean normalize) throws UserException {
+        String schema = extractScheme(location);
+        String normalizedLocation = location;
+        StorageProperties storageProperties = null;
+        StorageProperties.Type type = SchemaTypeMapper.fromSchema(schema);
+        if (StorageProperties.Type.LOCAL.equals(type)) {
+            normalize = false;
+        }
+        if (normalize) {
+            storageProperties = findStorageProperties(type, schema, 
storagePropertiesMap);
+
+            if (storageProperties == null) {
+                throw new UserException("No storage properties found for 
schema: " + schema);
+            }
+            normalizedLocation = 
storageProperties.validateAndNormalizeUri(location);
+            if (StringUtils.isBlank(normalizedLocation)) {
+                throw new UserException("Invalid location: " + location + ", 
normalized location is null");
+            }
         }
-        // wide check range for the compatibility of s3 properties
-        return (props.containsKey(S3Properties.ENDPOINT) || 
props.containsKey(S3Properties.Env.ENDPOINT));
+
+        String encodedLocation = encodedLocation(normalizedLocation);
+        URI uri = URI.create(encodedLocation);
+        String fsIdentifier = Strings.nullToEmpty(uri.getScheme()) + "://" + 
Strings.nullToEmpty(uri.getAuthority());
+
+        return new LocationPath(schema, normalizedLocation, fsIdentifier, 
storageProperties);
     }
 
     /**
-     * The converted path is used for FE to get metadata.
-     * Change http://xxxx to s3://xxxx
+     * Finds the appropriate {@link StorageProperties} configuration for a 
given storage type and schema.
+     *
+     * This method attempts to locate the storage properties using the 
following logic:
      *
-     * @param location origin location
-     * @return metadata location path. just convert when storage is compatible 
with s3 client.
+     * 1. Direct match by type: Attempts to retrieve the properties from the 
map using the given {@code type}.
+     * 2. S3-Minio fallback: If the requested type is S3 and no properties are 
found, try to fall back to MinIO
+     * configuration,
+     *    assuming it is compatible with S3.
+     * 3. Compatibility fallback based on schema:
+     *    In older configurations, the schema name might not strictly match 
the actual storage type.
+     *    For example, a COS storage might use the "s3" schema, or an S3 
storage might use the "cos" schema.
+     *    To handle such legacy inconsistencies, we try to find any storage 
configuration with the name "s3"
+     *    if the schema maps to a file type of FILE_S3.
+     *
+     * @param type the storage type to search for
+     * @param schema the schema string used in the original request (e.g., 
"s3://bucket/file")
+     * @param storagePropertiesMap a map of available storage types to their 
configuration
+     * @return a matching {@link StorageProperties} if found; otherwise, 
{@code null}
      */
-    private static String convertToS3(String location) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("try convert location to s3 prefix: " + location);
+    private static StorageProperties 
findStorageProperties(StorageProperties.Type type, String schema,
+                                                           
Map<StorageProperties.Type, StorageProperties>
+                                                                   
storagePropertiesMap) {
+        // Step 1: Try direct match by type
+        StorageProperties props = storagePropertiesMap.get(type);
+        if (props != null) {
+            return props;
         }
-        int pos = findDomainPos(location);
-        return "s3" + location.substring(pos);
-    }
 
-    private static int findDomainPos(String rangeLocation) {
-        int pos = rangeLocation.indexOf("://");
-        if (pos == -1) {
-            throw new RuntimeException("No '://' found in location: " + 
rangeLocation);
+        // Step 2: Fallback - if type is S3 and MinIO is configured, assume 
it's compatible
+        if (type == StorageProperties.Type.S3
+                && 
storagePropertiesMap.containsKey(StorageProperties.Type.MINIO)) {
+            return storagePropertiesMap.get(StorageProperties.Type.MINIO);
         }
-        return pos;
+
+        // Step 3: Compatibility fallback based on schema
+        // In previous configurations, the schema name may not strictly match 
the actual storage type.
+        // For example, a COS storage might use the "s3" schema, or an S3 
storage might use the "cos" schema.
+        // To handle such legacy inconsistencies, we try to find a storage 
configuration whose name is "s3".
+        if 
(TFileType.FILE_S3.equals(SchemaTypeMapper.fromSchemaToFileType(schema))) {
+            return storagePropertiesMap.values().stream()
+                    .filter(p -> "s3".equalsIgnoreCase(p.getStorageName()))
+                    .findFirst()
+                    .orElse(null);
+        }
+
+        // Not found
+        return null;
     }
 
-    @VisibleForTesting
-    public static String normalizedHdfsPath(String location, String host, 
boolean enableOssRootPolicy) {
+
+    private static String encodedLocation(String location) {

Review Comment:
   Or move all static method together in this class, to make it more readable.



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java:
##########
@@ -338,137 +146,161 @@ private static String parseScheme(String finalLocation) 
{
         return scheme;
     }
 
-    private boolean useS3EndPoint(Map<String, String> props) {
-        if (props.containsKey(ObsProperties.ENDPOINT)
-                || props.containsKey(OssProperties.ENDPOINT)
-                || props.containsKey(CosProperties.ENDPOINT)) {
-            return false;
+    /**
+     * Static factory method to create a LocationPath2 instance.
+     *
+     * @param location             the input URI location string
+     * @param storagePropertiesMap map of schema type to corresponding storage 
properties
+     * @return a new LocationPath2 instance
+     * @throws UserException if validation fails or required data is missing
+     */
+    public static LocationPath of(String location,
+                                  Map<StorageProperties.Type, 
StorageProperties> storagePropertiesMap,
+                                  boolean normalize) throws UserException {
+        String schema = extractScheme(location);
+        String normalizedLocation = location;
+        StorageProperties storageProperties = null;
+        StorageProperties.Type type = SchemaTypeMapper.fromSchema(schema);
+        if (StorageProperties.Type.LOCAL.equals(type)) {
+            normalize = false;
+        }
+        if (normalize) {
+            storageProperties = findStorageProperties(type, schema, 
storagePropertiesMap);
+
+            if (storageProperties == null) {
+                throw new UserException("No storage properties found for 
schema: " + schema);
+            }
+            normalizedLocation = 
storageProperties.validateAndNormalizeUri(location);
+            if (StringUtils.isBlank(normalizedLocation)) {
+                throw new UserException("Invalid location: " + location + ", 
normalized location is null");
+            }
         }
-        // wide check range for the compatibility of s3 properties
-        return (props.containsKey(S3Properties.ENDPOINT) || 
props.containsKey(S3Properties.Env.ENDPOINT));
+
+        String encodedLocation = encodedLocation(normalizedLocation);
+        URI uri = URI.create(encodedLocation);
+        String fsIdentifier = Strings.nullToEmpty(uri.getScheme()) + "://" + 
Strings.nullToEmpty(uri.getAuthority());
+
+        return new LocationPath(schema, normalizedLocation, fsIdentifier, 
storageProperties);
     }
 
     /**
-     * The converted path is used for FE to get metadata.
-     * Change http://xxxx to s3://xxxx
+     * Finds the appropriate {@link StorageProperties} configuration for a 
given storage type and schema.
+     *
+     * This method attempts to locate the storage properties using the 
following logic:
      *
-     * @param location origin location
-     * @return metadata location path. just convert when storage is compatible 
with s3 client.
+     * 1. Direct match by type: Attempts to retrieve the properties from the 
map using the given {@code type}.
+     * 2. S3-Minio fallback: If the requested type is S3 and no properties are 
found, try to fall back to MinIO
+     * configuration,
+     *    assuming it is compatible with S3.
+     * 3. Compatibility fallback based on schema:
+     *    In older configurations, the schema name might not strictly match 
the actual storage type.
+     *    For example, a COS storage might use the "s3" schema, or an S3 
storage might use the "cos" schema.
+     *    To handle such legacy inconsistencies, we try to find any storage 
configuration with the name "s3"
+     *    if the schema maps to a file type of FILE_S3.
+     *
+     * @param type the storage type to search for
+     * @param schema the schema string used in the original request (e.g., 
"s3://bucket/file")
+     * @param storagePropertiesMap a map of available storage types to their 
configuration
+     * @return a matching {@link StorageProperties} if found; otherwise, 
{@code null}
      */
-    private static String convertToS3(String location) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("try convert location to s3 prefix: " + location);
+    private static StorageProperties 
findStorageProperties(StorageProperties.Type type, String schema,
+                                                           
Map<StorageProperties.Type, StorageProperties>
+                                                                   
storagePropertiesMap) {
+        // Step 1: Try direct match by type
+        StorageProperties props = storagePropertiesMap.get(type);
+        if (props != null) {
+            return props;
         }
-        int pos = findDomainPos(location);
-        return "s3" + location.substring(pos);
-    }
 
-    private static int findDomainPos(String rangeLocation) {
-        int pos = rangeLocation.indexOf("://");
-        if (pos == -1) {
-            throw new RuntimeException("No '://' found in location: " + 
rangeLocation);
+        // Step 2: Fallback - if type is S3 and MinIO is configured, assume 
it's compatible
+        if (type == StorageProperties.Type.S3
+                && 
storagePropertiesMap.containsKey(StorageProperties.Type.MINIO)) {
+            return storagePropertiesMap.get(StorageProperties.Type.MINIO);
         }
-        return pos;
+
+        // Step 3: Compatibility fallback based on schema
+        // In previous configurations, the schema name may not strictly match 
the actual storage type.
+        // For example, a COS storage might use the "s3" schema, or an S3 
storage might use the "cos" schema.
+        // To handle such legacy inconsistencies, we try to find a storage 
configuration whose name is "s3".
+        if 
(TFileType.FILE_S3.equals(SchemaTypeMapper.fromSchemaToFileType(schema))) {
+            return storagePropertiesMap.values().stream()
+                    .filter(p -> "s3".equalsIgnoreCase(p.getStorageName()))
+                    .findFirst()
+                    .orElse(null);
+        }
+
+        // Not found
+        return null;
     }
 
-    @VisibleForTesting
-    public static String normalizedHdfsPath(String location, String host, 
boolean enableOssRootPolicy) {
+
+    private static String encodedLocation(String location) {

Review Comment:
   How about moving all static method to a new class like `PathUtils`?
   There are too many static method in `LocationPath` which make this class 
messy.



##########
fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java:
##########
@@ -75,13 +75,12 @@ public HMSCachedClient getClient() {
         };
 
         ArrayList<String> locations = new ArrayList<String>() {{
-                add("gs://abc/def");
+                add("oss://abc/def");
                 add("s3://abc/def");
                 add("s3a://abc/def");
                 add("s3n://abc/def");
-                add("bos://abc/def");
-                add("oss://abc/def");
                 add("cos://abc/def");
+                add("oss://abc/def");

Review Comment:
   Same entry? `oss://abc/def`



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java:
##########
@@ -338,137 +146,161 @@ private static String parseScheme(String finalLocation) 
{
         return scheme;
     }
 
-    private boolean useS3EndPoint(Map<String, String> props) {
-        if (props.containsKey(ObsProperties.ENDPOINT)
-                || props.containsKey(OssProperties.ENDPOINT)
-                || props.containsKey(CosProperties.ENDPOINT)) {
-            return false;
+    /**
+     * Static factory method to create a LocationPath2 instance.

Review Comment:
   ```suggestion
        * Static factory method to create a LocationPath instance.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -176,7 +176,7 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, 
IcebergSplit icebergSpli
             for (IcebergDeleteFileFilter filter : 
icebergSplit.getDeleteFileFilters()) {
                 TIcebergDeleteFileDesc deleteFileDesc = new 
TIcebergDeleteFileDesc();
                 String deleteFilePath = filter.getDeleteFilePath();
-                LocationPath locationPath = new LocationPath(deleteFilePath, 
icebergSplit.getConfig());
+                LocationPath locationPath = LocationPath.of(deleteFilePath, 
icebergSplit.getConfig());

Review Comment:
   get config from iceberg split is not a good design



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java:
##########
@@ -755,28 +706,31 @@ public LoadingCache<PartitionCacheKey, HivePartition> 
getPartitionCache() {
     }
 
     public List<FileCacheValue> getFilesByTransaction(List<HivePartition> 
partitions, Map<String, String> txnValidIds,
-            boolean isFullAcid, String bindBrokerName) {
+                                                      boolean isFullAcid, 
String bindBrokerName) {
         List<FileCacheValue> fileCacheValues = Lists.newArrayList();
         try {
             if (partitions.isEmpty()) {
                 return fileCacheValues;
             }
-
-            Map<String, String> properties = 
catalog.getCatalogProperty().getProperties();
             for (HivePartition partition : partitions) {
                 //Get filesystem multiple times, Reason: 
https://github.com/apache/doris/pull/23409.
+                LocationPath locationPath = 
LocationPath.of(partition.getPath(),
+                        
catalog.getCatalogProperty().getStoragePropertiesMap());
+                // Use the bind broker name to get the file system, so that 
the file system can be shared
                 RemoteFileSystem fileSystem = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                         new FileSystemCache.FileSystemCacheKey(
-                                
LocationPath.getFSIdentity(partition.getPath(), properties, bindBrokerName),
-                                properties, bindBrokerName, jobConf));
-
-                AuthenticationConfig authenticationConfig = 
AuthenticationConfig.getKerberosConfig(jobConf);
+                                locationPath.getNormalizedLocation(),
+                                locationPath.getStorageProperties()));
+                // consider other methods to get the authenticator
+                AuthenticationConfig authenticationConfig = 
AuthenticationConfig.getKerberosConfig(locationPath

Review Comment:
   No need to new AuthenticationConfig and HadoopAuthenticator everytime



##########
fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java:
##########
@@ -97,21 +69,14 @@ public boolean equals(Object obj) {
                 return false;
             }
             FileSystemCacheKey o = (FileSystemCacheKey) obj;
-            boolean equalsWithoutBroker = type.equals(o.type)
-                    && fsIdent.equals(o.fsIdent)
-                    && properties.equals(o.properties);
-            if (bindBrokerName == null) {
-                return equalsWithoutBroker && o.bindBrokerName == null;
-            }
-            return equalsWithoutBroker && 
bindBrokerName.equals(o.bindBrokerName);
+            //fixme 需要重写吗

Review Comment:
   English



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java:
##########
@@ -274,44 +80,46 @@ public TFileType getTFileTypeForBE() {
      * @return BE scan range path
      */
     public Path toStorageLocation() {
-        switch (scheme) {
-            case S3:
-            case S3A:
-            case S3N:
-            case COS:
-            case OSS:
-            case OBS:
-            case BOS:
-            case GCS:
-            case COSN:
-                // All storage will use s3 client to access on BE, so need 
convert to s3
-                return new Path(convertToS3(location));
-            case HDFS:
-            case OSS_HDFS:
-            case VIEWFS:
-            case GFS:
-            case JFS:
-            case OFS:
-            case LOCAL:
-            default:
-                return getPath();
-        }
+        return new Path(normalizedLocation);
     }
 
-    public Scheme getScheme() {
-        return scheme;
-    }
 
-    public String get() {
-        return location;
+    public FileSystemType getFileSystemType() {
+        return SchemaTypeMapper.fromSchemaToFileSystemType(schema);
     }
 
-    public Path getPath() {
-        return new Path(location);
-    }
 
-    public boolean isBindBroker() {
-        return isBindBroker;
+    /**
+     * URI schema, e.g., "s3", "hdfs", "file"
+     */
+    private final String schema;

Review Comment:
   Move the field declaration at the beginning of the calss



##########
fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java:
##########
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum SchemaTypeMapper {
+
+    S3("s3", StorageProperties.Type.S3, FileSystemType.S3, TFileType.FILE_S3),
+    S3A("s3a", StorageProperties.Type.S3, FileSystemType.S3, 
TFileType.FILE_S3),
+    S3N("s3n", StorageProperties.Type.S3, FileSystemType.S3, 
TFileType.FILE_S3),
+    COSN("cosn", StorageProperties.Type.COS, FileSystemType.S3, 
TFileType.FILE_S3),
+    OFS("ofs", StorageProperties.Type.BROKER, FileSystemType.OFS, 
TFileType.FILE_BROKER),
+    GFS("gfs", StorageProperties.Type.BROKER, FileSystemType.HDFS, 
TFileType.FILE_BROKER),
+    JFS("jfs", StorageProperties.Type.BROKER, FileSystemType.JFS, 
TFileType.FILE_BROKER),
+    VIEWFS("viewfs", StorageProperties.Type.HDFS, FileSystemType.HDFS, 
TFileType.FILE_HDFS),
+    //LAKEFS("lakefs", StorageProperties.Type.LAKEFS),
+    //GCS("gs", StorageProperties.Type.S3),
+    //BOS("bos", StorageProperties.Type.BOS),
+    FILE("file", StorageProperties.Type.LOCAL, FileSystemType.FILE, 
TFileType.FILE_LOCAL),
+
+    OSS("oss", StorageProperties.Type.OSS, FileSystemType.S3, 
TFileType.FILE_S3),
+    OBS("obs", StorageProperties.Type.OBS, FileSystemType.S3, 
TFileType.FILE_S3),
+    COS("cos", StorageProperties.Type.COS, FileSystemType.S3, 
TFileType.FILE_S3),
+    //MINIO("minio", StorageProperties.Type.MINIO),
+    /*
+     * Only secure protocols are supported to ensure safe access to Azure 
storage services.
+     * This implementation allows only "abfss" and "wasbs" schemes, which 
operate over HTTPS.
+     * Insecure or deprecated schemes such as "abfs", "wasb", and "adl" are 
explicitly unsupported.
+     * */
+    ABFSS("abfss", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
+    WASBS("wasbs", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
+    AZURE("azure", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
+    HDFS("hdfs", StorageProperties.Type.HDFS, FileSystemType.HDFS, 
TFileType.FILE_HDFS),
+    LOCAL("local", StorageProperties.Type.HDFS, FileSystemType.HDFS, 
TFileType.FILE_HDFS);
+    private final String schema;
+    private final StorageProperties.Type storageType;
+    private final FileSystemType fileSystemType;
+    private final TFileType fileType;
+
+    SchemaTypeMapper(String schema, StorageProperties.Type storageType, 
FileSystemType fileSystemType,
+                     TFileType fileType) {
+        this.schema = schema;
+        this.storageType = storageType;
+        this.fileSystemType = fileSystemType;
+        this.fileType = fileType;
+    }
+
+
+    private static final Map<String, StorageProperties.Type> 
SCHEMA_TO_TYPE_MAP = new HashMap<>();
+
+    static {
+        for (SchemaTypeMapper mapper : values()) {
+            SCHEMA_TO_TYPE_MAP.put(mapper.schema.toLowerCase(), 
mapper.storageType);
+        }
+    }
+
+    private static final Map<String, FileSystemType> SCHEMA_TO_FS_TYPE_MAP = 
new HashMap<>();
+
+    static {
+        for (SchemaTypeMapper mapper : values()) {
+            SCHEMA_TO_FS_TYPE_MAP.put(mapper.schema.toLowerCase(), 
mapper.fileSystemType);
+        }
+    }
+
+    private static final Map<String, TFileType> SCHEMA_TO_FILE_TYPE_MAP = new 
HashMap<>();
+
+    static {
+        for (SchemaTypeMapper mapper : values()) {
+            SCHEMA_TO_FILE_TYPE_MAP.put(mapper.schema.toLowerCase(), 
mapper.fileType);
+        }
+    }
+
+    /*
+     * Compatibility note:
+     * When processing HDFS paths, if the URI lacks a schema (protocol),
+     * it is assumed to be of "hdfs" type by default. This is a compatibility 
sacrifice
+     * made to support legacy behaviors.
+     *
+     * Legacy systems often omitted the schema in HDFS paths, e.g. 
"/user/hadoop/data"
+     * instead of "hdfs:///user/hadoop/data". To avoid breaking existing code,
+     * this default assumption is applied for smoother compatibility and 
migration.
+     */
+    public static StorageProperties.Type fromSchema(String schema) {
+        if (StringUtils.isBlank(schema)) {
+            return StorageProperties.Type.HDFS;

Review Comment:
   We should also consider the local file system.
   For path like `/user/hadoop/data`, how to distinguish it between a local 
path or an hdfs path?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java:
##########
@@ -505,7 +505,9 @@ private HudiSplit generateHudiSplit(FileSlice fileSlice, 
List<String> partitionV
 
         // no base file, use log file to parse file type
         String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
-        HudiSplit split = new HudiSplit(new LocationPath(agencyPath, 
hmsTable.getCatalogProperties()),
+        LocationPath locationPath;
+        locationPath = LocationPath.of(agencyPath, 
hmsTable.getStoragePropertiesMap());

Review Comment:
   ```suggestion
           LocationPath locationPath = LocationPath.of(agencyPath, 
hmsTable.getStoragePropertiesMap());
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java:
##########
@@ -107,9 +131,27 @@ public Map<String, String> getHadoopProperties() {
 
     public void addProperty(String key, String val) {
         this.properties.put(key, val);
+        this.storagePropertiesMap = null; // reset storage properties map
     }
 
     public void deleteProperty(String key) {
         this.properties.remove(key);
+        this.storagePropertiesMap = null;
+    }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        reInitCatalogStorageProperties();

Review Comment:
   Why do we need to call this in `gsonPostProcess()`?
   Why just let the `storagePropertiesMap` being lazy initialized?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to