This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 6e35555a83d branch-3.1: [feat](storage)Support Azure Blob Storage 
#56861 (#57219)
6e35555a83d is described below

commit 6e35555a83d9b67a40ca5fbbe0110fae6ea28980
Author: Calvin Kirs <[email protected]>
AuthorDate: Thu Oct 30 17:44:54 2025 +0800

    branch-3.1: [feat](storage)Support Azure Blob Storage #56861 (#57219)
    
    cherry pick #56861
    
    ---------
    
    Co-authored-by: Pxl <[email protected]>
---
 be/src/io/file_factory.h                           |   2 +
 be/src/io/fs/azure_obj_storage_client.cpp          |   6 +-
 fe/fe-core/pom.xml                                 |   4 +
 .../java/org/apache/doris/analysis/BrokerDesc.java |   2 +-
 .../org/apache/doris/analysis/StorageBackend.java  |  25 ++-
 .../org/apache/doris/common/util/LocationPath.java |  15 ++
 .../doris/datasource/hive/HMSTransaction.java      |  61 ++++---
 .../doris/datasource/hive/HiveMetaStoreCache.java  |   5 +-
 .../property/storage/AzureProperties.java          |  53 ++++--
 .../property/storage/AzurePropertyUtils.java       | 189 +++++++++++++++++++++
 .../property/storage/S3PropertyUtils.java          |   8 +-
 .../property/storage/StorageProperties.java        |   1 +
 .../java/org/apache/doris/fs/SchemaTypeMapper.java |   3 +-
 .../org/apache/doris/fs/obj/AzureObjStorage.java   |  95 ++++++++++-
 .../java/org/apache/doris/fs/obj/S3ObjStorage.java |   2 +-
 .../apache/doris/fs/remote/AzureFileSystem.java    |  19 ++-
 .../org/apache/doris/fs/remote/ObjFileSystem.java  |  26 ++-
 .../org/apache/doris/fs/remote/S3FileSystem.java   |  24 +++
 .../org/apache/doris/planner/HiveTableSink.java    |   3 +-
 .../org/apache/doris/planner/IcebergTableSink.java |   5 +-
 .../property/storage/AzurePropertiesTest.java      |  28 +--
 .../property/storage/AzurePropertyUtilsTest.java   | 145 ++++++++++++++++
 fe/pom.xml                                         |  18 ++
 23 files changed, 635 insertions(+), 104 deletions(-)

diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 603afdee17d..ae6c10cdec8 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -113,6 +113,8 @@ public:
             return TFileType::FILE_LOCAL;
         case TStorageBackendType::S3:
             return TFileType::FILE_S3;
+        case TStorageBackendType::AZURE:
+            return TFileType::FILE_S3;
         case TStorageBackendType::BROKER:
             return TFileType::FILE_BROKER;
         case TStorageBackendType::HDFS:
diff --git a/be/src/io/fs/azure_obj_storage_client.cpp 
b/be/src/io/fs/azure_obj_storage_client.cpp
index ee4b8f7ac89..d27df50350f 100644
--- a/be/src/io/fs/azure_obj_storage_client.cpp
+++ b/be/src/io/fs/azure_obj_storage_client.cpp
@@ -43,6 +43,7 @@
 #include "common/status.h"
 #include "io/fs/obj_storage_client.h"
 #include "util/bvar_helper.h"
+#include "util/coding.h"
 #include "util/s3_util.h"
 
 using namespace Azure::Storage::Blobs;
@@ -54,8 +55,9 @@ std::string wrap_object_storage_path_msg(const 
doris::io::ObjectStoragePathOptio
 }
 
 auto base64_encode_part_num(int part_num) {
-    return Aws::Utils::HashingUtils::Base64Encode(
-            {reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)});
+    uint8_t buf[4];
+    doris::encode_fixed32_le(buf, static_cast<uint32_t>(part_num));
+    return Aws::Utils::HashingUtils::Base64Encode({buf, sizeof(buf)});
 }
 
 template <typename Func>
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index e7ed575562e..7fced3d51d8 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -125,6 +125,10 @@ under the License.
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-azure</artifactId>
+        </dependency>
         <!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
         <dependency>
             <groupId>commons-lang</groupId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
index 24808b2f243..a9a782e80fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
@@ -93,7 +93,7 @@ public class BrokerDesc extends StorageDesc implements 
Writable {
                 // Create primary storage properties from the given 
configuration
                 this.storageProperties = 
StorageProperties.createPrimary(this.properties);
                 // Override the storage type based on property configuration
-                this.storageType = 
StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
+                this.storageType = 
StorageBackend.StorageType.valueOfIgnoreCase(storageProperties.getStorageName());
             } catch (StoragePropertiesException e) {
                 // Currently ignored: these properties might be 
broker-specific.
                 // Just keep the storage type as BROKER, and try to create 
BrokerProperties
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 9fa2a1203bf..690f43e2f8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -129,6 +129,16 @@ public class StorageBackend implements ParseNode {
             return description;
         }
 
+        public static StorageBackend.StorageType valueOfIgnoreCase(String 
name) {
+            for (StorageBackend.StorageType type : 
StorageBackend.StorageType.values()) {
+                if (type.name().equalsIgnoreCase(name)) {
+                    return type;
+                }
+            }
+            throw new IllegalArgumentException("No enum constant "
+                    + StorageBackend.StorageType.class.getCanonicalName() + 
"." + name);
+        }
+
         public TStorageBackendType toThrift() {
             switch (this) {
                 case S3:
@@ -162,21 +172,6 @@ public class StorageBackend implements ParseNode {
          */
         public static final Set<StorageType> REFACTOR_STORAGE_TYPES =
                 ImmutableSet.of(StorageType.S3, StorageType.HDFS, 
StorageType.OFS, StorageType.JFS, StorageType.AZURE);
-
-        public static StorageType convertToStorageType(String storageName) {
-            switch (storageName.toLowerCase()) {
-                case "hdfs":
-                    return StorageType.HDFS;
-                case "s3":
-                    return StorageType.S3;
-                case "jfs":
-                    return StorageType.JFS;
-                case "local":
-                    return StorageType.LOCAL;
-                default:
-                    throw new IllegalArgumentException("Invalid storage type: 
" + storageName);
-            }
-        }
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index 9830e627779..67aa9426d5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -185,6 +185,21 @@ public class LocationPath {
         }
     }
 
+    public static LocationPath of(String location,
+                                  StorageProperties storageProperties) {
+        try {
+            String schema = extractScheme(location);
+            String normalizedLocation = 
storageProperties.validateAndNormalizeUri(location);
+            String encodedLocation = encodedLocation(normalizedLocation);
+            URI uri = URI.create(encodedLocation);
+            String fsIdentifier = Strings.nullToEmpty(uri.getScheme()) + "://"
+                    + Strings.nullToEmpty(uri.getAuthority());
+            return new LocationPath(schema, normalizedLocation, fsIdentifier, 
storageProperties);
+        } catch (UserException e) {
+            throw new StoragePropertiesException("Failed to create 
LocationPath for location: " + location, e);
+        }
+    }
+
     /**
      * Extracts the URI scheme (e.g., "s3", "hdfs") from the location string.
      *
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index c151b36fcd3..4bd60ecb96c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -31,6 +31,7 @@ import 
org.apache.doris.datasource.statistics.CommonStatistics;
 import org.apache.doris.fs.FileSystem;
 import org.apache.doris.fs.FileSystemProvider;
 import org.apache.doris.fs.FileSystemUtil;
+import org.apache.doris.fs.remote.ObjFileSystem;
 import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.S3FileSystem;
 import org.apache.doris.fs.remote.SwitchingFileSystem;
@@ -61,8 +62,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
 import software.amazon.awssdk.services.s3.model.CompletedPart;
 
 import java.util.ArrayList;
@@ -230,7 +229,7 @@ public class HMSTransaction implements Transaction {
             if (pu.getS3MpuPendingUploads() != null) {
                 for (TS3MPUPendingUpload s3MPUPendingUpload : 
pu.getS3MpuPendingUploads()) {
                     uncompletedMpuPendingUploads.add(
-                            new 
UncompletedMpuPendingUpload(s3MPUPendingUpload, 
pu.getLocation().getTargetPath()));
+                            new 
UncompletedMpuPendingUpload(s3MPUPendingUpload, 
pu.getLocation().getWritePath()));
                 }
             }
         }
@@ -1198,7 +1197,7 @@ public class HMSTransaction implements Transaction {
                         tableAndMore.getFileNames());
             } else {
                 if 
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
-                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                    objCommit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
                             tableAndMore.hivePartitionUpdate, targetPath);
                 }
             }
@@ -1242,7 +1241,7 @@ public class HMSTransaction implements Transaction {
             } else {
                 if 
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
                     s3cleanWhenSuccess.add(targetPath);
-                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                    objCommit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
                             tableAndMore.hivePartitionUpdate, targetPath);
                 }
             }
@@ -1271,7 +1270,7 @@ public class HMSTransaction implements Transaction {
                         () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
             } else {
                 if 
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
-                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                    objCommit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
                             partitionAndMore.hivePartitionUpdate, targetPath);
                 }
             }
@@ -1315,7 +1314,7 @@ public class HMSTransaction implements Transaction {
                         partitionAndMore.getFileNames());
             } else {
                 if 
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
-                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                    objCommit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
                             partitionAndMore.hivePartitionUpdate, targetPath);
                 }
             }
@@ -1399,7 +1398,7 @@ public class HMSTransaction implements Transaction {
             } else {
                 if 
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
                     s3cleanWhenSuccess.add(targetPath);
-                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                    objCommit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
                             partitionAndMore.hivePartitionUpdate, targetPath);
                 }
             }
@@ -1621,21 +1620,35 @@ public class HMSTransaction implements Transaction {
         summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt);
     }
 
-    private void s3Commit(Executor fileSystemExecutor, 
List<CompletableFuture<?>> asyncFileSystemTaskFutures,
+    /**
+     * Commits object storage partition updates (e.g., for S3, Azure Blob, 
etc.).
+     *
+     * <p>In object storage systems, the write workflow is typically divided 
into two stages:
+     * <ul>
+     *   <li><b>Upload (Stage) Phase</b> – Performed by the BE (Backend).
+     *       During this phase, data parts (for S3) or staged blocks (for 
Azure) are uploaded to
+     *       the storage system.</li>
+     *   <li><b>Commit Phase</b> – Performed by the FE (Frontend).
+     *       The FE is responsible for finalizing the uploads initiated by the 
BE:
+     *       <ul>
+     *         <li>For <b>S3</b>: the FE calls {@code completeMultipartUpload} 
to merge all uploaded parts into a
+     *         single object.</li>
+     *         <li>For <b>Azure Blob</b>: the BE stages blocks, and the FE 
performs the final commit to seal
+     *         the blob.</li>
+     *       </ul>
+     *   </li>
+     * </ul>
+     *
+     * <p>This method is executed by the FE and ensures that all uploads 
initiated by the BE
+     * are properly committed and finalized on the object storage side.
+     */
+    private void objCommit(Executor fileSystemExecutor, 
List<CompletableFuture<?>> asyncFileSystemTaskFutures,
             AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate 
hivePartitionUpdate, String path) {
         List<TS3MPUPendingUpload> s3MpuPendingUploads = 
hivePartitionUpdate.getS3MpuPendingUploads();
         if (isMockedPartitionUpdate) {
             return;
         }
-
-        S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem) 
fs).fileSystem(path);
-        S3Client s3Client;
-        try {
-            s3Client = (S3Client) s3FileSystem.getObjStorage().getClient();
-        } catch (UserException e) {
-            throw new RuntimeException(e);
-        }
-
+        ObjFileSystem fileSystem = (ObjFileSystem) ((SwitchingFileSystem) 
fs).fileSystem(path);
         for (TS3MPUPendingUpload s3MPUPendingUpload : s3MpuPendingUploads) {
             asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> {
                 if (fileSystemTaskCancelled.get()) {
@@ -1644,15 +1657,13 @@ public class HMSTransaction implements Transaction {
                 List<CompletedPart> completedParts = Lists.newArrayList();
                 for (Map.Entry<Integer, String> entry : 
s3MPUPendingUpload.getEtags().entrySet()) {
                     
completedParts.add(CompletedPart.builder().eTag(entry.getValue()).partNumber(entry.getKey())
-                            .build());
+                              .build());
                 }
 
-                
s3Client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
-                        .bucket(s3MPUPendingUpload.getBucket())
-                        .key(s3MPUPendingUpload.getKey())
-                        .uploadId(s3MPUPendingUpload.getUploadId())
-                        
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
-                        .build());
+                
fileSystem.completeMultipartUpload(s3MPUPendingUpload.getBucket(),
+                         s3MPUPendingUpload.getKey(),
+                         s3MPUPendingUpload.getUploadId(),
+                         s3MPUPendingUpload.getEtags());
                 uncompletedMpuPendingUploads.remove(new 
UncompletedMpuPendingUpload(s3MPUPendingUpload, path));
             }, fileSystemExecutor));
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index d03593ab2ef..0c011c386ee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -379,9 +379,8 @@ public class HiveMetaStoreCache {
         if (status.ok()) {
             for (RemoteFile remoteFile : remoteFiles) {
                 String srcPath = remoteFile.getPath().toString();
-                LocationPath remoteFileLocationPath = LocationPath.of(srcPath, 
catalog.getCatalogProperty()
-                        .getStoragePropertiesMap());
-                result.addFile(remoteFile, remoteFileLocationPath);
+                LocationPath srcLocationPath = LocationPath.of(srcPath, 
locationPath.getStorageProperties());
+                result.addFile(remoteFile, srcLocationPath);
             }
         } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
             // User may manually remove partition under HDFS, in this case,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
index 581ca235e31..38315f00077 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
@@ -24,6 +24,7 @@ import org.apache.doris.datasource.property.ConnectorProperty;
 import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.hadoop.conf.Configuration;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -64,18 +65,20 @@ public class AzureProperties extends StorageProperties {
 
 
     @Getter
-    @ConnectorProperty(names = {"azure.access_key", "s3.access_key", 
"AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
+    @ConnectorProperty(names = {"azure.account_name", "azure.access_key", 
"s3.access_key",
+            "AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
             description = "The access key of S3.")
-    protected String accessKey = "";
+    protected String accountName = "";
 
     @Getter
-    @ConnectorProperty(names = {"azure.secret_key", "s3.secret_key", 
"AWS_SECRET_KEY", "secret_key"},
+    @ConnectorProperty(names = {"azure.account_key", "azure.secret_key", 
"s3.secret_key",
+            "AWS_SECRET_KEY", "secret_key"},
             sensitive = true,
             description = "The secret key of S3.")
-    protected String secretKey = "";
+    protected String accountKey = "";
 
     @Getter
-    @ConnectorProperty(names = {"azure.bucket", "s3.bucket"},
+    @ConnectorProperty(names = {"container", "azure.bucket", "s3.bucket"},
             required = false,
             description = "The container of Azure blob.")
     protected String container = "";
@@ -108,7 +111,7 @@ public class AzureProperties extends StorageProperties {
             throw new IllegalArgumentException(String.format("Endpoint '%s' is 
not valid. It should end with '%s'.",
                     endpoint, AZURE_ENDPOINT_SUFFIX));
         }
-        this.endpoint = formatAzureEndpoint(endpoint, accessKey);
+        this.endpoint = formatAzureEndpoint(endpoint, accountName);
     }
 
     public static boolean guessIsMe(Map<String, String> origProps) {
@@ -133,8 +136,8 @@ public class AzureProperties extends StorageProperties {
         Map<String, String> s3Props = new HashMap<>();
         s3Props.put("AWS_ENDPOINT", endpoint);
         s3Props.put("AWS_REGION", "dummy_region");
-        s3Props.put("AWS_ACCESS_KEY", accessKey);
-        s3Props.put("AWS_SECRET_KEY", secretKey);
+        s3Props.put("AWS_ACCESS_KEY", accountName);
+        s3Props.put("AWS_SECRET_KEY", accountKey);
         s3Props.put("AWS_NEED_OVERRIDE_ENDPOINT", "true");
         s3Props.put("provider", "azure");
         s3Props.put("use_path_style", usePathStyle);
@@ -155,24 +158,46 @@ public class AzureProperties extends StorageProperties {
 
     @Override
     public String validateAndNormalizeUri(String url) throws UserException {
-        return S3PropertyUtils.validateAndNormalizeUri(url, usePathStyle, 
forceParsingByStandardUrl);
+        return AzurePropertyUtils.validateAndNormalizeUri(url);
 
     }
 
     @Override
     public String validateAndGetUri(Map<String, String> loadProps) throws 
UserException {
-        return S3PropertyUtils.validateAndGetUri(loadProps);
+        return AzurePropertyUtils.validateAndGetUri(loadProps);
     }
 
     @Override
     public String getStorageName() {
-        return "Azure";
+        return "AZURE";
     }
 
     @Override
     public void initializeHadoopStorageConfig() {
-        // Azure does not require any special Hadoop configuration for S3 
compatibility.
-        // The properties are already set in the getBackendConfigProperties 
method.
-        // This method will be removed in the future when FileIO is fully 
implemented.
+        hadoopStorageConfig = new Configuration();
+        //disable azure cache
+        // Disable all Azure ABFS/WASB FileSystem caching to ensure fresh 
instances per configuration
+        for (String scheme : new String[]{"abfs", "abfss", "wasb", "wasbs"}) {
+            hadoopStorageConfig.set("fs." + scheme + ".impl.disable.cache", 
"true");
+        }
+        origProps.forEach((k, v) -> {
+            if (k.startsWith("fs.azure.")) {
+                hadoopStorageConfig.set(k, v);
+            }
+        });
+        setAzureAccountKeys(hadoopStorageConfig, accountName, accountKey);
     }
+
+    private static void setAzureAccountKeys(Configuration conf, String 
accountName, String accountKey) {
+        String[] endpoints = {
+                "dfs.core.windows.net",
+                "blob.core.windows.net"
+        };
+        for (String endpoint : endpoints) {
+            String key = String.format("fs.azure.account.key.%s.%s", 
accountName, endpoint);
+            conf.set(key, accountKey);
+        }
+        conf.set("fs.azure.account.key", accountKey);
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java
new file mode 100644
index 00000000000..8c986b74da0
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+import 
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class AzurePropertyUtils {
+
+    /**
+     * Validates and normalizes an Azure Blob Storage URI into a unified 
{@code s3://}-style format.
+     * <p>
+     * This method supports the following URI formats:
+     * <ul>
+     *   <li>HDFS-style Azure URIs: {@code wasb://}, {@code wasbs://}, {@code 
abfs://}, {@code abfss://}</li>
+     *   <li>HTTPS-style Azure Blob URLs: {@code 
https://<account>.blob.core.windows.net/<container>/<path>}</li>
+     * </ul>
+     * <p>
+     * The normalized output will always be in the form of:
+     * <pre>{@code
+     * s3://<container>/<path>
+     * }</pre>
+     * <p>
+     * Examples:
+     * <ul>
+     *   <li>{@code 
wasbs://[email protected]/data/file.txt}
+     *       → {@code s3://container/data/file.txt}</li>
+     *   <li>{@code https://account.blob.core.windows.net/container/file.csv}
+     *       → {@code s3://container/file.csv}</li>
+     * </ul>
+     *
+     * @param path the input Azure URI string to be validated and normalized
+     * @return a normalized {@code s3://}-style URI
+     * @throws StoragePropertiesException if the URI is blank, invalid, or 
unsupported
+     */
+    public static String validateAndNormalizeUri(String path) throws 
UserException {
+
+        if (StringUtils.isBlank(path)) {
+            throw new StoragePropertiesException("Path cannot be null or 
empty");
+        }
+
+        String lower = path.toLowerCase();
+
+        // Only accept Azure Blob Storage-related URI schemes
+        if (!(lower.startsWith("wasb://") || lower.startsWith("wasbs://")
+                || lower.startsWith("abfs://") || lower.startsWith("abfss://")
+                || lower.startsWith("https://";) || lower.startsWith("http://";)
+                || lower.startsWith("s3://"))) {
+            throw new StoragePropertiesException("Unsupported Azure URI 
scheme: " + path);
+        }
+
+        return convertToS3Style(path);
+    }
+
+    /**
+     * Converts an Azure Blob Storage URI into a unified {@code 
s3://<container>/<path>} format.
+     * <p>
+     * This method recognizes both:
+     * <ul>
+     *   <li>HDFS-style Azure URIs ({@code wasb://}, {@code wasbs://}, {@code 
abfs://}, {@code abfss://})</li>
+     *   <li>HTTPS-style Azure Blob URLs ({@code 
https://<account>.blob.core.windows.net/...})</li>
+     * </ul>
+     * <p>
+     * It throws an exception if the URI is invalid or does not match Azure 
Blob Storage patterns.
+     *
+     * @param uri the original Azure URI string
+     * @return the normalized {@code s3://<container>/<path>} string
+     * @throws StoragePropertiesException if the URI is invalid or unsupported
+     */
+    private static String convertToS3Style(String uri) {
+        if (StringUtils.isBlank(uri)) {
+            throw new StoragePropertiesException("URI is blank");
+        }
+
+        String lowerUri = uri.toLowerCase();
+        if (lowerUri.startsWith("s3://")) {
+            return lowerUri;
+        }
+        // Handle Azure HDFS-style URIs (wasb://, wasbs://, abfs://, abfss://)
+        if (lowerUri.startsWith("wasb://") || lowerUri.startsWith("wasbs://")
+                || lowerUri.startsWith("abfs://") || 
lowerUri.startsWith("abfss://")) {
+
+            // Example: 
wasbs://[email protected]/path/file.txt
+            String schemeRemoved = uri.replaceFirst("^[a-z]+s?://", "");
+            int atIndex = schemeRemoved.indexOf('@');
+            if (atIndex < 0) {
+                throw new StoragePropertiesException("Invalid Azure URI, 
missing '@': " + uri);
+            }
+
+            // Extract container name (before '@')
+            String container = schemeRemoved.substring(0, atIndex);
+
+            // Extract remaining part after '@'
+            String remainder = schemeRemoved.substring(atIndex + 1);
+            int slashIndex = remainder.indexOf('/');
+
+            // Extract the path part if it exists
+            String path = (slashIndex != -1) ? remainder.substring(slashIndex 
+ 1) : "";
+
+            // Normalize to s3-style URI: s3://<container>/<path>
+            return StringUtils.isBlank(path)
+                    ? String.format("s3://%s", container)
+                    : String.format("s3://%s/%s", container, path);
+        }
+
+        // ② Handle HTTPS/HTTP Azure Blob Storage URLs
+        if (lowerUri.startsWith("https://";) || lowerUri.startsWith("http://";)) 
{
+            try {
+                URI parsed = new URI(uri);
+                String host = parsed.getHost();
+                String path = parsed.getPath();
+
+                if (StringUtils.isBlank(host)) {
+                    throw new StoragePropertiesException("Invalid Azure HTTPS 
URI, missing host: " + uri);
+                }
+
+                // Typical Azure Blob domain: <account>.blob.core.windows.net
+                if (!host.contains(".blob.core.windows.net")) {
+                    throw new StoragePropertiesException("Not an Azure Blob 
URL: " + uri);
+                }
+
+                // Path usually looks like: /<container>/<path>
+                String[] parts = path.split("/", 3);
+                if (parts.length < 2) {
+                    throw new StoragePropertiesException("Invalid Azure Blob 
URL, missing container: " + uri);
+                }
+
+                String container = parts[1];
+                String remainder = (parts.length == 3) ? parts[2] : "";
+
+                // Convert HTTPS URL to s3-style format
+                return StringUtils.isBlank(remainder)
+                        ? String.format("s3://%s", container)
+                        : String.format("s3://%s/%s", container, remainder);
+
+            } catch (URISyntaxException e) {
+                throw new StoragePropertiesException("Invalid HTTPS URI: " + 
uri, e);
+            }
+        }
+
+        throw new StoragePropertiesException("Unsupported Azure URI scheme: " 
+ uri);
+    }
+
+    /**
+     * Extracts and validates the "uri" entry from a properties map.
+     *
+     * <p>Example:
+     * <pre>
+     * Input : {"uri": 
"wasb://[email protected]/dir/file.txt"}
+     * Output: "wasb://[email protected]/dir/file.txt"
+     * </pre>
+     *
+     * @param props the configuration map expected to contain a "uri" key
+     * @return the URI string from the map
+     * @throws StoragePropertiesException if the map is empty or missing the 
"uri" key
+     */
+    public static String validateAndGetUri(Map<String, String> props) {
+        if (props == null || props.isEmpty()) {
+            throw new StoragePropertiesException("Properties map cannot be 
null or empty");
+        }
+
+        return props.entrySet().stream()
+                .filter(e -> 
StorageProperties.URI_KEY.equalsIgnoreCase(e.getKey()))
+                .map(Map.Entry::getValue)
+                .findFirst()
+                .orElseThrow(() -> new StoragePropertiesException("Properties 
must contain 'uri' key"));
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
index 49c7c5612a7..99064b4e2e2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
@@ -33,8 +33,6 @@ import java.util.Optional;
 public class S3PropertyUtils {
     private static final Logger LOG = 
LogManager.getLogger(S3PropertyUtils.class);
 
-    private static final String URI_KEY = "uri";
-
     /**
      * Constructs the S3 endpoint from a given URI in the props map.
      *
@@ -51,7 +49,7 @@ public class S3PropertyUtils {
                                                   String stringUsePathStyle,
                                                   String 
stringForceParsingByStandardUri) {
         Optional<String> uriOptional = props.entrySet().stream()
-                .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+                .filter(e -> 
e.getKey().equalsIgnoreCase(StorageProperties.URI_KEY))
                 .map(Map.Entry::getValue)
                 .findFirst();
 
@@ -90,7 +88,7 @@ public class S3PropertyUtils {
                                                 String stringUsePathStyle,
                                                 String 
stringForceParsingByStandardUri) {
         Optional<String> uriOptional = props.entrySet().stream()
-                .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+                .filter(e -> 
e.getKey().equalsIgnoreCase(StorageProperties.URI_KEY))
                 .map(Map.Entry::getValue)
                 .findFirst();
 
@@ -160,7 +158,7 @@ public class S3PropertyUtils {
             throw new StoragePropertiesException("props is empty");
         }
         Optional<String> uriOptional = props.entrySet().stream()
-                .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+                .filter(e -> 
e.getKey().equalsIgnoreCase(StorageProperties.URI_KEY))
                 .map(Map.Entry::getValue)
                 .findFirst();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 2ce87f9ffb9..ee75bc3b63d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -47,6 +47,7 @@ public abstract class StorageProperties extends 
ConnectionProperties {
     public static final String FS_OSS_HDFS_SUPPORT = "fs.oss-hdfs.support";
     public static final String FS_LOCAL_SUPPORT = "fs.local.support";
     public static final String DEPRECATED_OSS_HDFS_SUPPORT = 
"oss.hdfs.enabled";
+    protected static final String URI_KEY = "uri";
 
     public static final String FS_PROVIDER_KEY = "provider";
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java
index 03240f74367..a89e8be1ac9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java
@@ -72,9 +72,10 @@ public enum SchemaTypeMapper {
      * This implementation allows only "abfss" and "wasbs" schemes, which 
operate over HTTPS.
      * Insecure or deprecated schemes such as "abfs", "wasb", and "adl" are 
explicitly unsupported.
      * */
+    ABFS("abfs", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
     ABFSS("abfss", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
+    WASB("wasb", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
     WASBS("wasbs", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
-    AZURE("azure", StorageProperties.Type.AZURE, FileSystemType.S3, 
TFileType.FILE_S3),
     HDFS("hdfs", StorageProperties.Type.HDFS, FileSystemType.HDFS, 
TFileType.FILE_HDFS),
     LOCAL("local", StorageProperties.Type.HDFS, FileSystemType.HDFS, 
TFileType.FILE_HDFS);
     //LAKEFS("lakefs", StorageProperties.Type.LAKEFS),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
index 17f2d3b3439..3e014f3db3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
@@ -29,6 +29,7 @@ import com.azure.core.http.rest.PagedIterable;
 import com.azure.core.http.rest.PagedResponse;
 import com.azure.core.http.rest.Response;
 import com.azure.core.util.Context;
+import com.azure.core.util.IterableStream;
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
@@ -38,11 +39,13 @@ import com.azure.storage.blob.batch.BlobBatchClient;
 import com.azure.storage.blob.batch.BlobBatchClientBuilder;
 import com.azure.storage.blob.models.BlobErrorCode;
 import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobItemProperties;
 import com.azure.storage.blob.models.BlobProperties;
 import com.azure.storage.blob.models.BlobStorageException;
 import com.azure.storage.blob.models.ListBlobsOptions;
 import com.azure.storage.blob.specialized.BlockBlobClient;
 import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.http.HttpStatus;
 import org.apache.logging.log4j.LogManager;
@@ -52,6 +55,8 @@ import org.jetbrains.annotations.Nullable;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
@@ -59,7 +64,10 @@ import java.util.ArrayList;
 import java.util.Base64;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 public class AzureObjStorage implements ObjStorage<BlobServiceClient> {
     private static final Logger LOG = 
LogManager.getLogger(AzureObjStorage.class);
@@ -94,8 +102,8 @@ public class AzureObjStorage implements 
ObjStorage<BlobServiceClient> {
     @Override
     public BlobServiceClient getClient() throws UserException {
         if (client == null) {
-            StorageSharedKeyCredential cred = new 
StorageSharedKeyCredential(azureProperties.getAccessKey(),
-                    azureProperties.getSecretKey());
+            StorageSharedKeyCredential cred = new 
StorageSharedKeyCredential(azureProperties.getAccountName(),
+                    azureProperties.getAccountKey());
             BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
             builder.credential(cred);
             builder.endpoint(azureProperties.getEndpoint());
@@ -258,6 +266,22 @@ public class AzureObjStorage implements 
ObjStorage<BlobServiceClient> {
         }
     }
 
+    public void completeMultipartUpload(String bucket, String key, 
Map<Integer, String> parts) {
+        BlockBlobClient blockBlobClient;
+        try {
+            blockBlobClient = 
getClient().getBlobContainerClient(bucket).getBlobClient(key).getBlockBlobClient();
+        } catch (UserException e) {
+            throw new RuntimeException(e);
+        }
+        List<String> blockIds = parts.keySet().stream()
+                .map(k -> Base64.getEncoder()
+                        .encodeToString(ByteBuffer.allocate(4)
+                                .order(ByteOrder.LITTLE_ENDIAN)
+                                .putInt(k)
+                                .array())).collect(Collectors.toList());
+        blockBlobClient.commitBlockList(blockIds);
+    }
+
     @Override
     public RemoteObjects listObjects(String remotePath, String 
continuationToken) throws DdlException {
         try {
@@ -294,6 +318,28 @@ public class AzureObjStorage implements 
ObjStorage<BlobServiceClient> {
         return path;
     }
 
+    public Status listDirectories(String remotePath, Set<String> result) {
+        try {
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            String bucket = uri.getBucket();
+            String key = uri.getKey();
+            String schemaAndBucket = remotePath.substring(0, 
remotePath.length() - key.length());
+            String prefix = key.endsWith("/") ? key : key + "/";
+            BlobContainerClient containerClient = 
getClient().getBlobContainerClient(bucket);
+            IterableStream<BlobItem> blobs = 
containerClient.listBlobsByHierarchy(prefix);
+            for (BlobItem blobItem : blobs) {
+                if (Boolean.TRUE.equals(blobItem.isPrefix())) {
+                    String path = S3ObjStorage.toPath(schemaAndBucket, 
blobItem.getName()).toString();
+                    result.add(path);
+                }
+            }
+            return Status.OK;
+        } catch (Exception e) {
+            throw new RuntimeException("Azure FileSystem list directories 
failed: "
+                    + ExceptionUtils.getRootCauseMessage(e), e);
+        }
+    }
+
     public Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
         long roundCnt = 0;
         long elementCnt = 0;
@@ -330,7 +376,7 @@ public class AzureObjStorage implements 
ObjStorage<BlobServiceClient> {
                     java.nio.file.Path blobPath = 
Paths.get(blobItem.getName());
 
                     boolean isPrefix = false;
-                    while 
(blobPath.normalize().toString().startsWith(listPrefix)) {
+                    while (null != blobPath && 
blobPath.normalize().toString().startsWith(listPrefix)) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("get blob {}", 
blobPath.normalize().toString());
                         }
@@ -381,13 +427,51 @@ public class AzureObjStorage implements 
ObjStorage<BlobServiceClient> {
         return st;
     }
 
+    public Status listFiles(String remotePath, boolean recursive, 
List<RemoteFile> result) {
+        try {
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            String bucket = uri.getBucket();
+            String key = uri.getKey();
+            String schemaAndBucket = remotePath.substring(0, 
remotePath.length() - key.length());
+
+            String prefix = key.endsWith("/") ? key : key + "/";
+            BlobContainerClient containerClient = 
getClient().getBlobContainerClient(bucket);
+            IterableStream<BlobItem> blobs = 
containerClient.listBlobsByHierarchy(prefix);
+
+            for (BlobItem blobItem : blobs) {
+                if (Boolean.TRUE.equals(blobItem.isPrefix())) {
+                    if (recursive) {
+                        String path = S3ObjStorage.toPath(schemaAndBucket, 
blobItem.getName()).toString();
+                        Status status = listFiles(path, recursive, result);
+                        if (status != Status.OK) {
+                            return status;
+                        }
+                    }
+                } else {
+                    BlobItemProperties props = blobItem.getProperties();
+                    RemoteFile file = new RemoteFile(
+                            S3ObjStorage.toPath(schemaAndBucket, 
blobItem.getName()),
+                            false,
+                            props.getContentLength(),
+                            props.getContentLength(),
+                            props.getLastModified().getSecond(),
+                            null);
+                    result.add(file);
+                }
+            }
+            return Status.OK;
+        } catch (Exception e) {
+            throw new RuntimeException("Azure FileSystem list files failed: "
+                    + ExceptionUtils.getRootCauseMessage(e), e);
+        }
+    }
+
     public PagedResponse<BlobItem> getPagedBlobItems(BlobContainerClient 
client, ListBlobsOptions options,
                                                      String 
newContinuationToken) {
         PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, 
newContinuationToken, null);
         return pagedBlobs.iterableByPage().iterator().next();
     }
 
-
     public Status multipartUpload(String remotePath, @Nullable InputStream 
inputStream, long totalBytes) {
         Status st = Status.OK;
         long uploadedBytes = 0;
@@ -427,6 +511,7 @@ public class AzureObjStorage implements 
ObjStorage<BlobServiceClient> {
     @Override
     public void close() throws Exception {
         // Create a BlobServiceClient instance (thread-safe and reusable).
-       // Note: BlobServiceClient does NOT implement Closeable and does not 
require explicit closing.
+        // Note: BlobServiceClient does NOT implement Closeable and does not 
require explicit closing.
     }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 238072df3c0..1ae1f32319f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -188,7 +188,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
         return Status.OK;
     }
 
-    private Path toPath(String schemaAndBucket, String key) {
+    protected static Path toPath(String schemaAndBucket, String key) {
         // Ensure inputs are not null
         if (schemaAndBucket == null) {
             schemaAndBucket = "";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
index e9da3b4ee76..c568b6b6f9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
@@ -23,12 +23,16 @@ import 
org.apache.doris.datasource.property.storage.AzureProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.obj.AzureObjStorage;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class AzureFileSystem extends ObjFileSystem {
-
+    private static final Logger LOG = 
LogManager.getLogger(AzureFileSystem.class);
     private final AzureProperties azureProperties;
 
     public AzureFileSystem(AzureProperties azureProperties) {
@@ -44,7 +48,8 @@ public class AzureFileSystem extends ObjFileSystem {
 
     @Override
     public Status listFiles(String remotePath, boolean recursive, 
List<RemoteFile> result) {
-        throw new UnsupportedOperationException("Listing files is not 
supported in Azure File System.");
+        AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage();
+        return azureObjStorage.listFiles(remotePath, recursive, result);
     }
 
     @Override
@@ -55,7 +60,8 @@ public class AzureFileSystem extends ObjFileSystem {
 
     @Override
     public Status listDirectories(String remotePath, Set<String> result) {
-        throw new UnsupportedOperationException("Listing directories is not 
supported in Azure File System.");
+        AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage();
+        return azureObjStorage.listDirectories(remotePath, result);
     }
 
     @Override
@@ -73,4 +79,11 @@ public class AzureFileSystem extends ObjFileSystem {
             }
         }
     }
+
+    @Override
+    public void completeMultipartUpload(String bucket, String key, String 
uploadId, Map<Integer, String> parts) {
+        AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage();
+        azureObjStorage.completeMultipartUpload(bucket, key, parts);
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
index 989b1632ec4..f04fb6fa01b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
@@ -35,6 +35,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.Map;
 
 public abstract class ObjFileSystem extends RemoteFileSystem {
     private static final Logger LOG = 
LogManager.getLogger(ObjFileSystem.class);
@@ -62,9 +63,10 @@ public abstract class ObjFileSystem extends RemoteFileSystem 
{
 
     /**
      * download data from remote file and check data size with expected file 
size.
+     *
      * @param remoteFilePath remote file path
-     * @param localFilePath local file path
-     * @param fileSize download data size
+     * @param localFilePath  local file path
+     * @param fileSize       download data size
      * @return
      */
     @Override
@@ -162,4 +164,24 @@ public abstract class ObjFileSystem extends 
RemoteFileSystem {
     public Status deleteDirectory(String absolutePath) {
         return objStorage.deleteObjects(absolutePath);
     }
+
+
+    /**
+     * Completes a multipart upload operation.
+     *
+     * <p>In object storage systems, large files are often uploaded in 
multiple parts.
+     * Once all parts have been successfully uploaded, this method is called 
to merge
+     * them into a single finalized object.
+     *
+     * <p>The main purpose of this method is to notify the underlying storage 
service
+     * to perform the final merge and make the object available for normal 
access.
+     *
+     * @param bucket   The name of the target bucket.
+     * @param key      The full object key (path) within the bucket.
+     * @param uploadId The unique identifier of the multipart upload session.
+     * @param parts    A mapping of part numbers to their corresponding ETag 
values,
+     *                 used to assemble the parts in the correct order.
+     */
+    public abstract void completeMultipartUpload(String bucket, String key,
+                                                 String uploadId, Map<Integer, 
String> parts);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index c3608b7b35f..1925dc86b8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -28,10 +28,15 @@ import org.apache.doris.fs.obj.S3ObjStorage;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class S3FileSystem extends ObjFileSystem {
@@ -108,4 +113,23 @@ public class S3FileSystem extends ObjFileSystem {
             }
         }
     }
+
+    @Override
+    public void completeMultipartUpload(String bucket, String key, String 
uploadId, Map<Integer, String> parts) {
+        S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
+        List<CompletedPart> completedParts = new ArrayList<>();
+        for (Map.Entry<Integer, String> entry : parts.entrySet()) {
+            completedParts.add(CompletedPart.builder()
+                    .partNumber(entry.getKey())
+                    .eTag(entry.getValue())
+                    .build());
+        }
+
+        
objStorage.getClient().completeMultipartUpload(CompleteMultipartUploadRequest.builder()
+                .bucket(bucket)
+                .key(key)
+                .uploadId(uploadId)
+                
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
+                .build());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index a1957fbc906..68a0edc430f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -124,12 +124,13 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
         setSerDeProperties(tSink);
 
         THiveLocationParams locationParams = new THiveLocationParams();
+        String originalLocation = sd.getLocation();
         LocationPath locationPath = LocationPath.of(sd.getLocation(), 
targetTable.getStoragePropertiesMap());
         String location = sd.getLocation();
         TFileType fileType = locationPath.getTFileTypeForBE();
         if (fileType == TFileType.FILE_S3) {
             locationParams.setWritePath(locationPath.getNormalizedLocation());
-            
locationParams.setOriginalWritePath(locationPath.getNormalizedLocation());
+            locationParams.setOriginalWritePath(originalLocation);
             locationParams.setTargetPath(locationPath.getNormalizedLocation());
             if (insertCtx.isPresent()) {
                 HiveInsertCommandContext context = (HiveInsertCommandContext) 
insertCtx.get();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index 6f53c8b0228..c536844c649 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -148,9 +148,10 @@ public class IcebergTableSink extends 
BaseExternalTableDataSink {
         tSink.setHadoopConfig(props);
 
         // location
-        LocationPath locationPath = 
LocationPath.of(IcebergUtils.dataLocation(icebergTable), storagePropertiesMap);
+        String originalLocation = IcebergUtils.dataLocation(icebergTable);
+        LocationPath locationPath = LocationPath.of(originalLocation, 
storagePropertiesMap);
         tSink.setOutputPath(locationPath.toStorageLocation().toString());
-        tSink.setOriginalOutputPath(locationPath.getPath().toString());
+        tSink.setOriginalOutputPath(originalLocation);
         TFileType fileType = locationPath.getTFileTypeForBE();
         tSink.setFileType(fileType);
         if (fileType.equals(TFileType.FILE_BROKER)) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java
index fd36521a828..7542bdbce29 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java
@@ -50,11 +50,11 @@ public class AzurePropertiesTest {
 
         // Verify if the properties are correctly parsed
         
Assertions.assertEquals("https://mystorageaccount.blob.core.windows.net";, 
azureProperties.getEndpoint());
-        Assertions.assertEquals("myAzureAccessKey", 
azureProperties.getAccessKey());
-        Assertions.assertEquals("myAzureSecretKey", 
azureProperties.getSecretKey());
+        Assertions.assertEquals("myAzureAccessKey", 
azureProperties.getAccountName());
+        Assertions.assertEquals("myAzureSecretKey", 
azureProperties.getAccountKey());
         Assertions.assertEquals("false", azureProperties.getUsePathStyle());
         Assertions.assertEquals("false", 
azureProperties.getForceParsingByStandardUrl());
-        Assertions.assertEquals("Azure", azureProperties.getStorageName());
+        Assertions.assertEquals("AZURE", azureProperties.getStorageName());
     }
 
     // Test for missing access_key configuration, should throw an exception
@@ -98,26 +98,6 @@ public class AzurePropertiesTest {
 
     }
 
-    // Test for path style when use_path_style is false
-    @Test
-    public void testPathStyleCombinations() throws Exception {
-        origProps.put("s3.endpoint", 
"https://mystorageaccount.blob.core.windows.net";);
-        origProps.put("s3.access_key", "a");
-        origProps.put("s3.secret_key", "b");
-        origProps.put("provider", "azure");
-
-        // By default, use_path_style is false
-        AzureProperties azureProperties = (AzureProperties) 
StorageProperties.createPrimary(origProps);
-        Assertions.assertEquals("s3://mystorageaccount/mycontainer/blob.txt",
-                
azureProperties.validateAndNormalizeUri("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt";));
-
-        // Set use_path_style to true
-        origProps.put("use_path_style", "true");
-        azureProperties = (AzureProperties) 
StorageProperties.createPrimary(origProps);
-        Assertions.assertEquals("s3://mycontainer/blob.txt",
-                
azureProperties.validateAndNormalizeUri("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt";));
-    }
-
     @Test
     public void testParsingUri() throws Exception {
         origProps.put("s3.endpoint", 
"https://mystorageaccount.blob.core.windows.net";);
@@ -136,7 +116,7 @@ public class AzurePropertiesTest {
         
Assertions.assertEquals("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt";,
                 azureProperties.validateAndGetUri(origProps));
         azureProperties.setUsePathStyle("false");
-        Assertions.assertEquals("s3://mystorageaccount/mycontainer/blob.txt",
+        Assertions.assertEquals("s3://mycontainer/blob.txt",
                 
azureProperties.validateAndNormalizeUri("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt";));
 
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java
new file mode 100644
index 00000000000..dc6eb8ad74c
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import 
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AzurePropertyUtilsTest {
+
+    // ---------- validateAndNormalizeUri Tests ----------
+
+    @Test
+    public void testWasbsUri() throws Exception {
+        String input = 
"wasbs://[email protected]/data/file.txt";
+        String expected = "s3://container/data/file.txt";
+        Assertions.assertEquals(expected, 
AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testS3Uri() throws Exception {
+        String input = "s3://container/data/file.txt";
+        String expected = "s3://container/data/file.txt";
+        Assertions.assertEquals(expected, 
AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testAbfssUriWithoutPath() throws Exception {
+        String input = "abfss://[email protected]";
+        String expected = "s3://container";
+        Assertions.assertEquals(expected, 
AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testHttpUri() throws Exception {
+        String input = 
"http://account.blob.core.windows.net/container/folder/file.csv";;
+        String expected = "s3://container/folder/file.csv";
+        Assertions.assertEquals(expected, 
AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testHttpsUriWithoutPath() throws Exception {
+        String input = "https://account.blob.core.windows.net/container";;
+        String expected = "s3://container";
+        Assertions.assertEquals(expected, 
AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testHttpsUriWithPath() throws Exception {
+        String input = 
"https://account.blob.core.windows.net/container/data/file.parquet";;
+        String expected = "s3://container/data/file.parquet";
+        Assertions.assertEquals(expected, 
AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testInvalidAzureScheme() {
+        String input = 
"ftp://[email protected]/data/file.txt";;
+        Assertions.assertThrows(StoragePropertiesException.class, () ->
+                AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testMissingAtInWasbUri() {
+        String input = "wasb://container/account.blob.core.windows.net/data";
+        Assertions.assertThrows(StoragePropertiesException.class, () ->
+                AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testHttpsUriMissingHost() {
+        String input = "https:///container/file.txt";; // missing host
+        Assertions.assertThrows(StoragePropertiesException.class, () ->
+                AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testHttpsUriNotAzureBlob() {
+        String input = "https://account.otherdomain.com/container/file.txt";;
+        Assertions.assertThrows(StoragePropertiesException.class, () ->
+                AzurePropertyUtils.validateAndNormalizeUri(input));
+    }
+
+    @Test
+    public void testBlankUri() {
+        Assertions.assertThrows(StoragePropertiesException.class, () ->
+                AzurePropertyUtils.validateAndNormalizeUri(" "));
+    }
+
+    // ---------- validateAndGetUri Tests ----------
+
+    @Test
+    public void testValidateAndGetUriNormal() {
+        Map<String, String> props = new HashMap<>();
+        props.put("uri", 
"wasbs://[email protected]/data/file.txt");
+        
Assertions.assertEquals("wasbs://[email protected]/data/file.txt",
+                AzurePropertyUtils.validateAndGetUri(props));
+    }
+
+    @Test
+    public void testValidateAndGetUriCaseInsensitive() {
+        Map<String, String> props = new HashMap<>();
+        props.put("URI", 
"wasbs://[email protected]/data/file.txt");
+        
Assertions.assertEquals("wasbs://[email protected]/data/file.txt",
+                AzurePropertyUtils.validateAndGetUri(props));
+    }
+
+    @Test
+    public void testValidateAndGetUriMissing() {
+        Map<String, String> props = new HashMap<>();
+        props.put("path", "value");
+        Assertions.assertThrows(StoragePropertiesException.class, () ->
+                AzurePropertyUtils.validateAndGetUri(props));
+    }
+
+    @Test
+    public void testValidateAndGetUriEmptyMap() {
+        Assertions.assertThrows(StoragePropertiesException.class, () ->
+                AzurePropertyUtils.validateAndGetUri(new HashMap<>()));
+    }
+
+    @Test
+    public void testValidateAndGetUriNullMap() {
+        Assertions.assertThrows(StoragePropertiesException.class, () ->
+                AzurePropertyUtils.validateAndGetUri(null));
+    }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index 3569cd4e007..cf0bb46f3a3 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -280,6 +280,9 @@ under the License.
         <!--The dependence of transitive dependence cannot be ruled out, only 
Saving the nation through twisted ways.-->
         <netty-3-test.version>3.10.6.Final</netty-3-test.version>
         <objenesis.version>2.1</objenesis.version>
+        <!--ensure JDK 17 compatibility by removing dependency on internal 
X509V1CertImpl class-->
+        <wildfly-openssl.version>2.2.5.Final</wildfly-openssl.version>
+        <wildfly-common.version>2.0.1</wildfly-common.version>
         <!-- NOTE: Using grpc-java whose version is newer than 1.34.0 will 
break the build on CentOS 6 due to the obsolete GLIBC -->
         <grpc-java.version>1.34.0</grpc-java.version>
         <!--Need to ensure that the version is the same as in 
arrow/java/pom.xml or compatible with it.-->
@@ -1026,6 +1029,16 @@ under the License.
                 <artifactId>objenesis</artifactId>
                 <version>${objenesis.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.wildfly.openssl</groupId>
+                <artifactId>wildfly-openssl</artifactId>
+                <version>${wildfly-openssl.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.wildfly.common</groupId>
+                <artifactId>wildfly-common</artifactId>
+                <version>${wildfly-common.version}</version>
+            </dependency>
             <!-- 
https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
             <dependency>
                 <groupId>com.google.protobuf</groupId>
@@ -1192,6 +1205,11 @@ under the License.
                     </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-azure</artifactId>
+                <version>${hadoop.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-aws</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to