This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 6e35555a83d branch-3.1: [feat](storage)Support Azure Blob Storage
#56861 (#57219)
6e35555a83d is described below
commit 6e35555a83d9b67a40ca5fbbe0110fae6ea28980
Author: Calvin Kirs <[email protected]>
AuthorDate: Thu Oct 30 17:44:54 2025 +0800
branch-3.1: [feat](storage)Support Azure Blob Storage #56861 (#57219)
cherry pick #56861
---------
Co-authored-by: Pxl <[email protected]>
---
be/src/io/file_factory.h | 2 +
be/src/io/fs/azure_obj_storage_client.cpp | 6 +-
fe/fe-core/pom.xml | 4 +
.../java/org/apache/doris/analysis/BrokerDesc.java | 2 +-
.../org/apache/doris/analysis/StorageBackend.java | 25 ++-
.../org/apache/doris/common/util/LocationPath.java | 15 ++
.../doris/datasource/hive/HMSTransaction.java | 61 ++++---
.../doris/datasource/hive/HiveMetaStoreCache.java | 5 +-
.../property/storage/AzureProperties.java | 53 ++++--
.../property/storage/AzurePropertyUtils.java | 189 +++++++++++++++++++++
.../property/storage/S3PropertyUtils.java | 8 +-
.../property/storage/StorageProperties.java | 1 +
.../java/org/apache/doris/fs/SchemaTypeMapper.java | 3 +-
.../org/apache/doris/fs/obj/AzureObjStorage.java | 95 ++++++++++-
.../java/org/apache/doris/fs/obj/S3ObjStorage.java | 2 +-
.../apache/doris/fs/remote/AzureFileSystem.java | 19 ++-
.../org/apache/doris/fs/remote/ObjFileSystem.java | 26 ++-
.../org/apache/doris/fs/remote/S3FileSystem.java | 24 +++
.../org/apache/doris/planner/HiveTableSink.java | 3 +-
.../org/apache/doris/planner/IcebergTableSink.java | 5 +-
.../property/storage/AzurePropertiesTest.java | 28 +--
.../property/storage/AzurePropertyUtilsTest.java | 145 ++++++++++++++++
fe/pom.xml | 18 ++
23 files changed, 635 insertions(+), 104 deletions(-)
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 603afdee17d..ae6c10cdec8 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -113,6 +113,8 @@ public:
return TFileType::FILE_LOCAL;
case TStorageBackendType::S3:
return TFileType::FILE_S3;
+ case TStorageBackendType::AZURE:
+ return TFileType::FILE_S3;
case TStorageBackendType::BROKER:
return TFileType::FILE_BROKER;
case TStorageBackendType::HDFS:
diff --git a/be/src/io/fs/azure_obj_storage_client.cpp
b/be/src/io/fs/azure_obj_storage_client.cpp
index ee4b8f7ac89..d27df50350f 100644
--- a/be/src/io/fs/azure_obj_storage_client.cpp
+++ b/be/src/io/fs/azure_obj_storage_client.cpp
@@ -43,6 +43,7 @@
#include "common/status.h"
#include "io/fs/obj_storage_client.h"
#include "util/bvar_helper.h"
+#include "util/coding.h"
#include "util/s3_util.h"
using namespace Azure::Storage::Blobs;
@@ -54,8 +55,9 @@ std::string wrap_object_storage_path_msg(const
doris::io::ObjectStoragePathOptio
}
auto base64_encode_part_num(int part_num) {
- return Aws::Utils::HashingUtils::Base64Encode(
- {reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)});
+ uint8_t buf[4];
+ doris::encode_fixed32_le(buf, static_cast<uint32_t>(part_num));
+ return Aws::Utils::HashingUtils::Base64Encode({buf, sizeof(buf)});
}
template <typename Func>
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index e7ed575562e..7fced3d51d8 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -125,6 +125,10 @@ under the License.
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-azure</artifactId>
+ </dependency>
<!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
<dependency>
<groupId>commons-lang</groupId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
index 24808b2f243..a9a782e80fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
@@ -93,7 +93,7 @@ public class BrokerDesc extends StorageDesc implements
Writable {
// Create primary storage properties from the given
configuration
this.storageProperties =
StorageProperties.createPrimary(this.properties);
// Override the storage type based on property configuration
- this.storageType =
StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
+ this.storageType =
StorageBackend.StorageType.valueOfIgnoreCase(storageProperties.getStorageName());
} catch (StoragePropertiesException e) {
// Currently ignored: these properties might be
broker-specific.
// Just keep the storage type as BROKER, and try to create
BrokerProperties
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 9fa2a1203bf..690f43e2f8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -129,6 +129,16 @@ public class StorageBackend implements ParseNode {
return description;
}
+ public static StorageBackend.StorageType valueOfIgnoreCase(String
name) {
+ for (StorageBackend.StorageType type :
StorageBackend.StorageType.values()) {
+ if (type.name().equalsIgnoreCase(name)) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException("No enum constant "
+ + StorageBackend.StorageType.class.getCanonicalName() +
"." + name);
+ }
+
public TStorageBackendType toThrift() {
switch (this) {
case S3:
@@ -162,21 +172,6 @@ public class StorageBackend implements ParseNode {
*/
public static final Set<StorageType> REFACTOR_STORAGE_TYPES =
ImmutableSet.of(StorageType.S3, StorageType.HDFS,
StorageType.OFS, StorageType.JFS, StorageType.AZURE);
-
- public static StorageType convertToStorageType(String storageName) {
- switch (storageName.toLowerCase()) {
- case "hdfs":
- return StorageType.HDFS;
- case "s3":
- return StorageType.S3;
- case "jfs":
- return StorageType.JFS;
- case "local":
- return StorageType.LOCAL;
- default:
- throw new IllegalArgumentException("Invalid storage type:
" + storageName);
- }
- }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index 9830e627779..67aa9426d5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -185,6 +185,21 @@ public class LocationPath {
}
}
+ public static LocationPath of(String location,
+ StorageProperties storageProperties) {
+ try {
+ String schema = extractScheme(location);
+ String normalizedLocation =
storageProperties.validateAndNormalizeUri(location);
+ String encodedLocation = encodedLocation(normalizedLocation);
+ URI uri = URI.create(encodedLocation);
+ String fsIdentifier = Strings.nullToEmpty(uri.getScheme()) + "://"
+ + Strings.nullToEmpty(uri.getAuthority());
+ return new LocationPath(schema, normalizedLocation, fsIdentifier,
storageProperties);
+ } catch (UserException e) {
+ throw new StoragePropertiesException("Failed to create
LocationPath for location: " + location, e);
+ }
+ }
+
/**
* Extracts the URI scheme (e.g., "s3", "hdfs") from the location string.
*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index c151b36fcd3..4bd60ecb96c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -31,6 +31,7 @@ import
org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.FileSystemProvider;
import org.apache.doris.fs.FileSystemUtil;
+import org.apache.doris.fs.remote.ObjFileSystem;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.fs.remote.SwitchingFileSystem;
@@ -61,8 +62,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
-import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import java.util.ArrayList;
@@ -230,7 +229,7 @@ public class HMSTransaction implements Transaction {
if (pu.getS3MpuPendingUploads() != null) {
for (TS3MPUPendingUpload s3MPUPendingUpload :
pu.getS3MpuPendingUploads()) {
uncompletedMpuPendingUploads.add(
- new
UncompletedMpuPendingUpload(s3MPUPendingUpload,
pu.getLocation().getTargetPath()));
+ new
UncompletedMpuPendingUpload(s3MPUPendingUpload,
pu.getLocation().getWritePath()));
}
}
}
@@ -1198,7 +1197,7 @@ public class HMSTransaction implements Transaction {
tableAndMore.getFileNames());
} else {
if
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
- s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ objCommit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
tableAndMore.hivePartitionUpdate, targetPath);
}
}
@@ -1242,7 +1241,7 @@ public class HMSTransaction implements Transaction {
} else {
if
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
s3cleanWhenSuccess.add(targetPath);
- s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ objCommit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
tableAndMore.hivePartitionUpdate, targetPath);
}
}
@@ -1271,7 +1270,7 @@ public class HMSTransaction implements Transaction {
() -> directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, true)));
} else {
if
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
- s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ objCommit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
partitionAndMore.hivePartitionUpdate, targetPath);
}
}
@@ -1315,7 +1314,7 @@ public class HMSTransaction implements Transaction {
partitionAndMore.getFileNames());
} else {
if
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
- s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ objCommit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
partitionAndMore.hivePartitionUpdate, targetPath);
}
}
@@ -1399,7 +1398,7 @@ public class HMSTransaction implements Transaction {
} else {
if
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
s3cleanWhenSuccess.add(targetPath);
- s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ objCommit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
partitionAndMore.hivePartitionUpdate, targetPath);
}
}
@@ -1621,21 +1620,35 @@ public class HMSTransaction implements Transaction {
summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt);
}
- private void s3Commit(Executor fileSystemExecutor,
List<CompletableFuture<?>> asyncFileSystemTaskFutures,
+ /**
+ * Commits object storage partition updates (e.g., for S3, Azure Blob,
etc.).
+ *
+ * <p>In object storage systems, the write workflow is typically divided
into two stages:
+ * <ul>
+ * <li><b>Upload (Stage) Phase</b> – Performed by the BE (Backend).
+ * During this phase, data parts (for S3) or staged blocks (for
Azure) are uploaded to
+ * the storage system.</li>
+ * <li><b>Commit Phase</b> – Performed by the FE (Frontend).
+ * The FE is responsible for finalizing the uploads initiated by the
BE:
+ * <ul>
+ * <li>For <b>S3</b>: the FE calls {@code completeMultipartUpload}
to merge all uploaded parts into a
+ * single object.</li>
+ * <li>For <b>Azure Blob</b>: the BE stages blocks, and the FE
performs the final commit to seal
+ * the blob.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <p>This method is executed by the FE and ensures that all uploads
initiated by the BE
+ * are properly committed and finalized on the object storage side.
+ */
+ private void objCommit(Executor fileSystemExecutor,
List<CompletableFuture<?>> asyncFileSystemTaskFutures,
AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate
hivePartitionUpdate, String path) {
List<TS3MPUPendingUpload> s3MpuPendingUploads =
hivePartitionUpdate.getS3MpuPendingUploads();
if (isMockedPartitionUpdate) {
return;
}
-
- S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem)
fs).fileSystem(path);
- S3Client s3Client;
- try {
- s3Client = (S3Client) s3FileSystem.getObjStorage().getClient();
- } catch (UserException e) {
- throw new RuntimeException(e);
- }
-
+ ObjFileSystem fileSystem = (ObjFileSystem) ((SwitchingFileSystem)
fs).fileSystem(path);
for (TS3MPUPendingUpload s3MPUPendingUpload : s3MpuPendingUploads) {
asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> {
if (fileSystemTaskCancelled.get()) {
@@ -1644,15 +1657,13 @@ public class HMSTransaction implements Transaction {
List<CompletedPart> completedParts = Lists.newArrayList();
for (Map.Entry<Integer, String> entry :
s3MPUPendingUpload.getEtags().entrySet()) {
completedParts.add(CompletedPart.builder().eTag(entry.getValue()).partNumber(entry.getKey())
- .build());
+ .build());
}
-
s3Client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
- .bucket(s3MPUPendingUpload.getBucket())
- .key(s3MPUPendingUpload.getKey())
- .uploadId(s3MPUPendingUpload.getUploadId())
-
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
- .build());
+
fileSystem.completeMultipartUpload(s3MPUPendingUpload.getBucket(),
+ s3MPUPendingUpload.getKey(),
+ s3MPUPendingUpload.getUploadId(),
+ s3MPUPendingUpload.getEtags());
uncompletedMpuPendingUploads.remove(new
UncompletedMpuPendingUpload(s3MPUPendingUpload, path));
}, fileSystemExecutor));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index d03593ab2ef..0c011c386ee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -379,9 +379,8 @@ public class HiveMetaStoreCache {
if (status.ok()) {
for (RemoteFile remoteFile : remoteFiles) {
String srcPath = remoteFile.getPath().toString();
- LocationPath remoteFileLocationPath = LocationPath.of(srcPath,
catalog.getCatalogProperty()
- .getStoragePropertiesMap());
- result.addFile(remoteFile, remoteFileLocationPath);
+ LocationPath srcLocationPath = LocationPath.of(srcPath,
locationPath.getStorageProperties());
+ result.addFile(remoteFile, srcLocationPath);
}
} else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
// User may manually remove partition under HDFS, in this case,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
index 581ca235e31..38315f00077 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
@@ -24,6 +24,7 @@ import org.apache.doris.datasource.property.ConnectorProperty;
import com.google.common.base.Strings;
import lombok.Getter;
import lombok.Setter;
+import org.apache.hadoop.conf.Configuration;
import java.util.HashMap;
import java.util.Map;
@@ -64,18 +65,20 @@ public class AzureProperties extends StorageProperties {
@Getter
- @ConnectorProperty(names = {"azure.access_key", "s3.access_key",
"AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
+ @ConnectorProperty(names = {"azure.account_name", "azure.access_key",
"s3.access_key",
+ "AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
description = "The access key of S3.")
- protected String accessKey = "";
+ protected String accountName = "";
@Getter
- @ConnectorProperty(names = {"azure.secret_key", "s3.secret_key",
"AWS_SECRET_KEY", "secret_key"},
+ @ConnectorProperty(names = {"azure.account_key", "azure.secret_key",
"s3.secret_key",
+ "AWS_SECRET_KEY", "secret_key"},
sensitive = true,
description = "The secret key of S3.")
- protected String secretKey = "";
+ protected String accountKey = "";
@Getter
- @ConnectorProperty(names = {"azure.bucket", "s3.bucket"},
+ @ConnectorProperty(names = {"container", "azure.bucket", "s3.bucket"},
required = false,
description = "The container of Azure blob.")
protected String container = "";
@@ -108,7 +111,7 @@ public class AzureProperties extends StorageProperties {
throw new IllegalArgumentException(String.format("Endpoint '%s' is
not valid. It should end with '%s'.",
endpoint, AZURE_ENDPOINT_SUFFIX));
}
- this.endpoint = formatAzureEndpoint(endpoint, accessKey);
+ this.endpoint = formatAzureEndpoint(endpoint, accountName);
}
public static boolean guessIsMe(Map<String, String> origProps) {
@@ -133,8 +136,8 @@ public class AzureProperties extends StorageProperties {
Map<String, String> s3Props = new HashMap<>();
s3Props.put("AWS_ENDPOINT", endpoint);
s3Props.put("AWS_REGION", "dummy_region");
- s3Props.put("AWS_ACCESS_KEY", accessKey);
- s3Props.put("AWS_SECRET_KEY", secretKey);
+ s3Props.put("AWS_ACCESS_KEY", accountName);
+ s3Props.put("AWS_SECRET_KEY", accountKey);
s3Props.put("AWS_NEED_OVERRIDE_ENDPOINT", "true");
s3Props.put("provider", "azure");
s3Props.put("use_path_style", usePathStyle);
@@ -155,24 +158,46 @@ public class AzureProperties extends StorageProperties {
@Override
public String validateAndNormalizeUri(String url) throws UserException {
- return S3PropertyUtils.validateAndNormalizeUri(url, usePathStyle,
forceParsingByStandardUrl);
+ return AzurePropertyUtils.validateAndNormalizeUri(url);
}
@Override
public String validateAndGetUri(Map<String, String> loadProps) throws
UserException {
- return S3PropertyUtils.validateAndGetUri(loadProps);
+ return AzurePropertyUtils.validateAndGetUri(loadProps);
}
@Override
public String getStorageName() {
- return "Azure";
+ return "AZURE";
}
@Override
public void initializeHadoopStorageConfig() {
- // Azure does not require any special Hadoop configuration for S3
compatibility.
- // The properties are already set in the getBackendConfigProperties
method.
- // This method will be removed in the future when FileIO is fully
implemented.
+ hadoopStorageConfig = new Configuration();
+ //disable azure cache
+ // Disable all Azure ABFS/WASB FileSystem caching to ensure fresh
instances per configuration
+ for (String scheme : new String[]{"abfs", "abfss", "wasb", "wasbs"}) {
+ hadoopStorageConfig.set("fs." + scheme + ".impl.disable.cache",
"true");
+ }
+ origProps.forEach((k, v) -> {
+ if (k.startsWith("fs.azure.")) {
+ hadoopStorageConfig.set(k, v);
+ }
+ });
+ setAzureAccountKeys(hadoopStorageConfig, accountName, accountKey);
}
+
+ private static void setAzureAccountKeys(Configuration conf, String
accountName, String accountKey) {
+ String[] endpoints = {
+ "dfs.core.windows.net",
+ "blob.core.windows.net"
+ };
+ for (String endpoint : endpoints) {
+ String key = String.format("fs.azure.account.key.%s.%s",
accountName, endpoint);
+ conf.set(key, accountKey);
+ }
+ conf.set("fs.azure.account.key", accountKey);
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java
new file mode 100644
index 00000000000..8c986b74da0
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class AzurePropertyUtils {
+
+ /**
+ * Validates and normalizes an Azure Blob Storage URI into a unified
{@code s3://}-style format.
+ * <p>
+ * This method supports the following URI formats:
+ * <ul>
+ * <li>HDFS-style Azure URIs: {@code wasb://}, {@code wasbs://}, {@code
abfs://}, {@code abfss://}</li>
+ * <li>HTTPS-style Azure Blob URLs: {@code
https://<account>.blob.core.windows.net/<container>/<path>}</li>
+ * </ul>
+ * <p>
+ * The normalized output will always be in the form of:
+ * <pre>{@code
+ * s3://<container>/<path>
+ * }</pre>
+ * <p>
+ * Examples:
+ * <ul>
+ * <li>{@code
wasbs://[email protected]/data/file.txt}
+ * → {@code s3://container/data/file.txt}</li>
+ * <li>{@code https://account.blob.core.windows.net/container/file.csv}
+ * → {@code s3://container/file.csv}</li>
+ * </ul>
+ *
+ * @param path the input Azure URI string to be validated and normalized
+ * @return a normalized {@code s3://}-style URI
+ * @throws StoragePropertiesException if the URI is blank, invalid, or
unsupported
+ */
+ public static String validateAndNormalizeUri(String path) throws
UserException {
+
+ if (StringUtils.isBlank(path)) {
+ throw new StoragePropertiesException("Path cannot be null or
empty");
+ }
+
+ String lower = path.toLowerCase();
+
+ // Only accept Azure Blob Storage-related URI schemes
+ if (!(lower.startsWith("wasb://") || lower.startsWith("wasbs://")
+ || lower.startsWith("abfs://") || lower.startsWith("abfss://")
+ || lower.startsWith("https://") || lower.startsWith("http://")
+ || lower.startsWith("s3://"))) {
+ throw new StoragePropertiesException("Unsupported Azure URI
scheme: " + path);
+ }
+
+ return convertToS3Style(path);
+ }
+
+ /**
+ * Converts an Azure Blob Storage URI into a unified {@code
s3://<container>/<path>} format.
+ * <p>
+ * This method recognizes both:
+ * <ul>
+ * <li>HDFS-style Azure URIs ({@code wasb://}, {@code wasbs://}, {@code
abfs://}, {@code abfss://})</li>
+ * <li>HTTPS-style Azure Blob URLs ({@code
https://<account>.blob.core.windows.net/...})</li>
+ * </ul>
+ * <p>
+ * It throws an exception if the URI is invalid or does not match Azure
Blob Storage patterns.
+ *
+ * @param uri the original Azure URI string
+ * @return the normalized {@code s3://<container>/<path>} string
+ * @throws StoragePropertiesException if the URI is invalid or unsupported
+ */
+ private static String convertToS3Style(String uri) {
+ if (StringUtils.isBlank(uri)) {
+ throw new StoragePropertiesException("URI is blank");
+ }
+
+ String lowerUri = uri.toLowerCase();
+ if (lowerUri.startsWith("s3://")) {
+ return lowerUri;
+ }
+ // Handle Azure HDFS-style URIs (wasb://, wasbs://, abfs://, abfss://)
+ if (lowerUri.startsWith("wasb://") || lowerUri.startsWith("wasbs://")
+ || lowerUri.startsWith("abfs://") ||
lowerUri.startsWith("abfss://")) {
+
+ // Example:
wasbs://[email protected]/path/file.txt
+ String schemeRemoved = uri.replaceFirst("^[a-z]+s?://", "");
+ int atIndex = schemeRemoved.indexOf('@');
+ if (atIndex < 0) {
+ throw new StoragePropertiesException("Invalid Azure URI,
missing '@': " + uri);
+ }
+
+ // Extract container name (before '@')
+ String container = schemeRemoved.substring(0, atIndex);
+
+ // Extract remaining part after '@'
+ String remainder = schemeRemoved.substring(atIndex + 1);
+ int slashIndex = remainder.indexOf('/');
+
+ // Extract the path part if it exists
+ String path = (slashIndex != -1) ? remainder.substring(slashIndex
+ 1) : "";
+
+ // Normalize to s3-style URI: s3://<container>/<path>
+ return StringUtils.isBlank(path)
+ ? String.format("s3://%s", container)
+ : String.format("s3://%s/%s", container, path);
+ }
+
+ // ② Handle HTTPS/HTTP Azure Blob Storage URLs
+ if (lowerUri.startsWith("https://") || lowerUri.startsWith("http://"))
{
+ try {
+ URI parsed = new URI(uri);
+ String host = parsed.getHost();
+ String path = parsed.getPath();
+
+ if (StringUtils.isBlank(host)) {
+ throw new StoragePropertiesException("Invalid Azure HTTPS
URI, missing host: " + uri);
+ }
+
+ // Typical Azure Blob domain: <account>.blob.core.windows.net
+ if (!host.contains(".blob.core.windows.net")) {
+ throw new StoragePropertiesException("Not an Azure Blob
URL: " + uri);
+ }
+
+ // Path usually looks like: /<container>/<path>
+ String[] parts = path.split("/", 3);
+ if (parts.length < 2) {
+ throw new StoragePropertiesException("Invalid Azure Blob
URL, missing container: " + uri);
+ }
+
+ String container = parts[1];
+ String remainder = (parts.length == 3) ? parts[2] : "";
+
+ // Convert HTTPS URL to s3-style format
+ return StringUtils.isBlank(remainder)
+ ? String.format("s3://%s", container)
+ : String.format("s3://%s/%s", container, remainder);
+
+ } catch (URISyntaxException e) {
+ throw new StoragePropertiesException("Invalid HTTPS URI: " +
uri, e);
+ }
+ }
+
+ throw new StoragePropertiesException("Unsupported Azure URI scheme: "
+ uri);
+ }
+
+ /**
+ * Extracts and validates the "uri" entry from a properties map.
+ *
+ * <p>Example:
+ * <pre>
+ * Input : {"uri":
"wasb://[email protected]/dir/file.txt"}
+ * Output: "wasb://[email protected]/dir/file.txt"
+ * </pre>
+ *
+ * @param props the configuration map expected to contain a "uri" key
+ * @return the URI string from the map
+ * @throws StoragePropertiesException if the map is empty or missing the
"uri" key
+ */
+ public static String validateAndGetUri(Map<String, String> props) {
+ if (props == null || props.isEmpty()) {
+ throw new StoragePropertiesException("Properties map cannot be
null or empty");
+ }
+
+ return props.entrySet().stream()
+ .filter(e ->
StorageProperties.URI_KEY.equalsIgnoreCase(e.getKey()))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElseThrow(() -> new StoragePropertiesException("Properties
must contain 'uri' key"));
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
index 49c7c5612a7..99064b4e2e2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
@@ -33,8 +33,6 @@ import java.util.Optional;
public class S3PropertyUtils {
private static final Logger LOG =
LogManager.getLogger(S3PropertyUtils.class);
- private static final String URI_KEY = "uri";
-
/**
* Constructs the S3 endpoint from a given URI in the props map.
*
@@ -51,7 +49,7 @@ public class S3PropertyUtils {
String stringUsePathStyle,
String
stringForceParsingByStandardUri) {
Optional<String> uriOptional = props.entrySet().stream()
- .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+ .filter(e ->
e.getKey().equalsIgnoreCase(StorageProperties.URI_KEY))
.map(Map.Entry::getValue)
.findFirst();
@@ -90,7 +88,7 @@ public class S3PropertyUtils {
String stringUsePathStyle,
String
stringForceParsingByStandardUri) {
Optional<String> uriOptional = props.entrySet().stream()
- .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+ .filter(e ->
e.getKey().equalsIgnoreCase(StorageProperties.URI_KEY))
.map(Map.Entry::getValue)
.findFirst();
@@ -160,7 +158,7 @@ public class S3PropertyUtils {
throw new StoragePropertiesException("props is empty");
}
Optional<String> uriOptional = props.entrySet().stream()
- .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+ .filter(e ->
e.getKey().equalsIgnoreCase(StorageProperties.URI_KEY))
.map(Map.Entry::getValue)
.findFirst();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 2ce87f9ffb9..ee75bc3b63d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -47,6 +47,7 @@ public abstract class StorageProperties extends
ConnectionProperties {
public static final String FS_OSS_HDFS_SUPPORT = "fs.oss-hdfs.support";
public static final String FS_LOCAL_SUPPORT = "fs.local.support";
public static final String DEPRECATED_OSS_HDFS_SUPPORT =
"oss.hdfs.enabled";
+ protected static final String URI_KEY = "uri";
public static final String FS_PROVIDER_KEY = "provider";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java
index 03240f74367..a89e8be1ac9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SchemaTypeMapper.java
@@ -72,9 +72,10 @@ public enum SchemaTypeMapper {
* This implementation allows only "abfss" and "wasbs" schemes, which
operate over HTTPS.
* Insecure or deprecated schemes such as "abfs", "wasb", and "adl" are
explicitly unsupported.
* */
+ ABFS("abfs", StorageProperties.Type.AZURE, FileSystemType.S3,
TFileType.FILE_S3),
ABFSS("abfss", StorageProperties.Type.AZURE, FileSystemType.S3,
TFileType.FILE_S3),
+ WASB("wasb", StorageProperties.Type.AZURE, FileSystemType.S3,
TFileType.FILE_S3),
WASBS("wasbs", StorageProperties.Type.AZURE, FileSystemType.S3,
TFileType.FILE_S3),
- AZURE("azure", StorageProperties.Type.AZURE, FileSystemType.S3,
TFileType.FILE_S3),
HDFS("hdfs", StorageProperties.Type.HDFS, FileSystemType.HDFS,
TFileType.FILE_HDFS),
LOCAL("local", StorageProperties.Type.HDFS, FileSystemType.HDFS,
TFileType.FILE_HDFS);
//LAKEFS("lakefs", StorageProperties.Type.LAKEFS),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
index 17f2d3b3439..3e014f3db3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
@@ -29,6 +29,7 @@ import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
+import com.azure.core.util.IterableStream;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
@@ -38,11 +39,13 @@ import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.http.HttpStatus;
import org.apache.logging.log4j.LogManager;
@@ -52,6 +55,8 @@ import org.jetbrains.annotations.Nullable;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
@@ -59,7 +64,10 @@ import java.util.ArrayList;
import java.util.Base64;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
public class AzureObjStorage implements ObjStorage<BlobServiceClient> {
private static final Logger LOG =
LogManager.getLogger(AzureObjStorage.class);
@@ -94,8 +102,8 @@ public class AzureObjStorage implements
ObjStorage<BlobServiceClient> {
@Override
public BlobServiceClient getClient() throws UserException {
if (client == null) {
- StorageSharedKeyCredential cred = new
StorageSharedKeyCredential(azureProperties.getAccessKey(),
- azureProperties.getSecretKey());
+ StorageSharedKeyCredential cred = new
StorageSharedKeyCredential(azureProperties.getAccountName(),
+ azureProperties.getAccountKey());
BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
builder.credential(cred);
builder.endpoint(azureProperties.getEndpoint());
@@ -258,6 +266,22 @@ public class AzureObjStorage implements
ObjStorage<BlobServiceClient> {
}
}
+ public void completeMultipartUpload(String bucket, String key,
Map<Integer, String> parts) {
+ BlockBlobClient blockBlobClient;
+ try {
+ blockBlobClient =
getClient().getBlobContainerClient(bucket).getBlobClient(key).getBlockBlobClient();
+ } catch (UserException e) {
+ throw new RuntimeException(e);
+ }
+ List<String> blockIds = parts.keySet().stream()
+ .map(k -> Base64.getEncoder()
+ .encodeToString(ByteBuffer.allocate(4)
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .putInt(k)
+ .array())).collect(Collectors.toList());
+ blockBlobClient.commitBlockList(blockIds);
+ }
+
@Override
public RemoteObjects listObjects(String remotePath, String
continuationToken) throws DdlException {
try {
@@ -294,6 +318,28 @@ public class AzureObjStorage implements
ObjStorage<BlobServiceClient> {
return path;
}
+ public Status listDirectories(String remotePath, Set<String> result) {
+ try {
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ String bucket = uri.getBucket();
+ String key = uri.getKey();
+ String schemaAndBucket = remotePath.substring(0,
remotePath.length() - key.length());
+ String prefix = key.endsWith("/") ? key : key + "/";
+ BlobContainerClient containerClient =
getClient().getBlobContainerClient(bucket);
+ IterableStream<BlobItem> blobs =
containerClient.listBlobsByHierarchy(prefix);
+ for (BlobItem blobItem : blobs) {
+ if (Boolean.TRUE.equals(blobItem.isPrefix())) {
+ String path = S3ObjStorage.toPath(schemaAndBucket,
blobItem.getName()).toString();
+ result.add(path);
+ }
+ }
+ return Status.OK;
+ } catch (Exception e) {
+ throw new RuntimeException("Azure FileSystem list directories
failed: "
+ + ExceptionUtils.getRootCauseMessage(e), e);
+ }
+ }
+
public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
long roundCnt = 0;
long elementCnt = 0;
@@ -330,7 +376,7 @@ public class AzureObjStorage implements
ObjStorage<BlobServiceClient> {
java.nio.file.Path blobPath =
Paths.get(blobItem.getName());
boolean isPrefix = false;
- while
(blobPath.normalize().toString().startsWith(listPrefix)) {
+ while (null != blobPath &&
blobPath.normalize().toString().startsWith(listPrefix)) {
if (LOG.isDebugEnabled()) {
LOG.debug("get blob {}",
blobPath.normalize().toString());
}
@@ -381,13 +427,51 @@ public class AzureObjStorage implements
ObjStorage<BlobServiceClient> {
return st;
}
+ public Status listFiles(String remotePath, boolean recursive,
List<RemoteFile> result) {
+ try {
+ S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
+ String bucket = uri.getBucket();
+ String key = uri.getKey();
+ String schemaAndBucket = remotePath.substring(0,
remotePath.length() - key.length());
+
+ String prefix = key.endsWith("/") ? key : key + "/";
+ BlobContainerClient containerClient =
getClient().getBlobContainerClient(bucket);
+ IterableStream<BlobItem> blobs =
containerClient.listBlobsByHierarchy(prefix);
+
+ for (BlobItem blobItem : blobs) {
+ if (Boolean.TRUE.equals(blobItem.isPrefix())) {
+ if (recursive) {
+ String path = S3ObjStorage.toPath(schemaAndBucket,
blobItem.getName()).toString();
+ Status status = listFiles(path, recursive, result);
+ if (status != Status.OK) {
+ return status;
+ }
+ }
+ } else {
+ BlobItemProperties props = blobItem.getProperties();
+ RemoteFile file = new RemoteFile(
+ S3ObjStorage.toPath(schemaAndBucket,
blobItem.getName()),
+ false,
+ props.getContentLength(),
+ props.getContentLength(),
+ props.getLastModified().getSecond(),
+ null);
+ result.add(file);
+ }
+ }
+ return Status.OK;
+ } catch (Exception e) {
+ throw new RuntimeException("Azure FileSystem list files failed: "
+ + ExceptionUtils.getRootCauseMessage(e), e);
+ }
+ }
+
public PagedResponse<BlobItem> getPagedBlobItems(BlobContainerClient
client, ListBlobsOptions options,
String
newContinuationToken) {
PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options,
newContinuationToken, null);
return pagedBlobs.iterableByPage().iterator().next();
}
-
public Status multipartUpload(String remotePath, @Nullable InputStream
inputStream, long totalBytes) {
Status st = Status.OK;
long uploadedBytes = 0;
@@ -427,6 +511,7 @@ public class AzureObjStorage implements
ObjStorage<BlobServiceClient> {
@Override
public void close() throws Exception {
// Create a BlobServiceClient instance (thread-safe and reusable).
- // Note: BlobServiceClient does NOT implement Closeable and does not
require explicit closing.
+ // Note: BlobServiceClient does NOT implement Closeable and does not
require explicit closing.
}
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 238072df3c0..1ae1f32319f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -188,7 +188,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
return Status.OK;
}
- private Path toPath(String schemaAndBucket, String key) {
+ protected static Path toPath(String schemaAndBucket, String key) {
// Ensure inputs are not null
if (schemaAndBucket == null) {
schemaAndBucket = "";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
index e9da3b4ee76..c568b6b6f9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
@@ -23,12 +23,16 @@ import
org.apache.doris.datasource.property.storage.AzureProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.obj.AzureObjStorage;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class AzureFileSystem extends ObjFileSystem {
-
+ private static final Logger LOG =
LogManager.getLogger(AzureFileSystem.class);
private final AzureProperties azureProperties;
public AzureFileSystem(AzureProperties azureProperties) {
@@ -44,7 +48,8 @@ public class AzureFileSystem extends ObjFileSystem {
@Override
public Status listFiles(String remotePath, boolean recursive,
List<RemoteFile> result) {
- throw new UnsupportedOperationException("Listing files is not
supported in Azure File System.");
+ AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage();
+ return azureObjStorage.listFiles(remotePath, recursive, result);
}
@Override
@@ -55,7 +60,8 @@ public class AzureFileSystem extends ObjFileSystem {
@Override
public Status listDirectories(String remotePath, Set<String> result) {
- throw new UnsupportedOperationException("Listing directories is not
supported in Azure File System.");
+ AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage();
+ return azureObjStorage.listDirectories(remotePath, result);
}
@Override
@@ -73,4 +79,11 @@ public class AzureFileSystem extends ObjFileSystem {
}
}
}
+
+ @Override
+ public void completeMultipartUpload(String bucket, String key, String
uploadId, Map<Integer, String> parts) {
+ AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage();
+ azureObjStorage.completeMultipartUpload(bucket, key, parts);
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
index 989b1632ec4..f04fb6fa01b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
@@ -35,6 +35,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.Map;
public abstract class ObjFileSystem extends RemoteFileSystem {
private static final Logger LOG =
LogManager.getLogger(ObjFileSystem.class);
@@ -62,9 +63,10 @@ public abstract class ObjFileSystem extends RemoteFileSystem
{
/**
* download data from remote file and check data size with expected file
size.
+ *
* @param remoteFilePath remote file path
- * @param localFilePath local file path
- * @param fileSize download data size
+ * @param localFilePath local file path
+ * @param fileSize download data size
* @return
*/
@Override
@@ -162,4 +164,24 @@ public abstract class ObjFileSystem extends
RemoteFileSystem {
public Status deleteDirectory(String absolutePath) {
return objStorage.deleteObjects(absolutePath);
}
+
+
+ /**
+ * Completes a multipart upload operation.
+ *
+ * <p>In object storage systems, large files are often uploaded in
multiple parts.
+ * Once all parts have been successfully uploaded, this method is called
to merge
+ * them into a single finalized object.
+ *
+ * <p>The main purpose of this method is to notify the underlying storage
service
+ * to perform the final merge and make the object available for normal
access.
+ *
+ * @param bucket The name of the target bucket.
+ * @param key The full object key (path) within the bucket.
+ * @param uploadId The unique identifier of the multipart upload session.
+ * @param parts A mapping of part numbers to their corresponding ETag
values,
+ * used to assemble the parts in the correct order.
+ */
+ public abstract void completeMultipartUpload(String bucket, String key,
+ String uploadId, Map<Integer,
String> parts);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index c3608b7b35f..1925dc86b8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -28,10 +28,15 @@ import org.apache.doris.fs.obj.S3ObjStorage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class S3FileSystem extends ObjFileSystem {
@@ -108,4 +113,23 @@ public class S3FileSystem extends ObjFileSystem {
}
}
}
+
+ @Override
+ public void completeMultipartUpload(String bucket, String key, String
uploadId, Map<Integer, String> parts) {
+ S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
+ List<CompletedPart> completedParts = new ArrayList<>();
+ for (Map.Entry<Integer, String> entry : parts.entrySet()) {
+ completedParts.add(CompletedPart.builder()
+ .partNumber(entry.getKey())
+ .eTag(entry.getValue())
+ .build());
+ }
+
+
objStorage.getClient().completeMultipartUpload(CompleteMultipartUploadRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .uploadId(uploadId)
+
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
+ .build());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index a1957fbc906..68a0edc430f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -124,12 +124,13 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
setSerDeProperties(tSink);
THiveLocationParams locationParams = new THiveLocationParams();
+ String originalLocation = sd.getLocation();
LocationPath locationPath = LocationPath.of(sd.getLocation(),
targetTable.getStoragePropertiesMap());
String location = sd.getLocation();
TFileType fileType = locationPath.getTFileTypeForBE();
if (fileType == TFileType.FILE_S3) {
locationParams.setWritePath(locationPath.getNormalizedLocation());
-
locationParams.setOriginalWritePath(locationPath.getNormalizedLocation());
+ locationParams.setOriginalWritePath(originalLocation);
locationParams.setTargetPath(locationPath.getNormalizedLocation());
if (insertCtx.isPresent()) {
HiveInsertCommandContext context = (HiveInsertCommandContext)
insertCtx.get();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index 6f53c8b0228..c536844c649 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -148,9 +148,10 @@ public class IcebergTableSink extends
BaseExternalTableDataSink {
tSink.setHadoopConfig(props);
// location
- LocationPath locationPath =
LocationPath.of(IcebergUtils.dataLocation(icebergTable), storagePropertiesMap);
+ String originalLocation = IcebergUtils.dataLocation(icebergTable);
+ LocationPath locationPath = LocationPath.of(originalLocation,
storagePropertiesMap);
tSink.setOutputPath(locationPath.toStorageLocation().toString());
- tSink.setOriginalOutputPath(locationPath.getPath().toString());
+ tSink.setOriginalOutputPath(originalLocation);
TFileType fileType = locationPath.getTFileTypeForBE();
tSink.setFileType(fileType);
if (fileType.equals(TFileType.FILE_BROKER)) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java
index fd36521a828..7542bdbce29 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java
@@ -50,11 +50,11 @@ public class AzurePropertiesTest {
// Verify if the properties are correctly parsed
Assertions.assertEquals("https://mystorageaccount.blob.core.windows.net",
azureProperties.getEndpoint());
- Assertions.assertEquals("myAzureAccessKey",
azureProperties.getAccessKey());
- Assertions.assertEquals("myAzureSecretKey",
azureProperties.getSecretKey());
+ Assertions.assertEquals("myAzureAccessKey",
azureProperties.getAccountName());
+ Assertions.assertEquals("myAzureSecretKey",
azureProperties.getAccountKey());
Assertions.assertEquals("false", azureProperties.getUsePathStyle());
Assertions.assertEquals("false",
azureProperties.getForceParsingByStandardUrl());
- Assertions.assertEquals("Azure", azureProperties.getStorageName());
+ Assertions.assertEquals("AZURE", azureProperties.getStorageName());
}
// Test for missing access_key configuration, should throw an exception
@@ -98,26 +98,6 @@ public class AzurePropertiesTest {
}
- // Test for path style when use_path_style is false
- @Test
- public void testPathStyleCombinations() throws Exception {
- origProps.put("s3.endpoint",
"https://mystorageaccount.blob.core.windows.net");
- origProps.put("s3.access_key", "a");
- origProps.put("s3.secret_key", "b");
- origProps.put("provider", "azure");
-
- // By default, use_path_style is false
- AzureProperties azureProperties = (AzureProperties)
StorageProperties.createPrimary(origProps);
- Assertions.assertEquals("s3://mystorageaccount/mycontainer/blob.txt",
-
azureProperties.validateAndNormalizeUri("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt"));
-
- // Set use_path_style to true
- origProps.put("use_path_style", "true");
- azureProperties = (AzureProperties)
StorageProperties.createPrimary(origProps);
- Assertions.assertEquals("s3://mycontainer/blob.txt",
-
azureProperties.validateAndNormalizeUri("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt"));
- }
-
@Test
public void testParsingUri() throws Exception {
origProps.put("s3.endpoint",
"https://mystorageaccount.blob.core.windows.net");
@@ -136,7 +116,7 @@ public class AzurePropertiesTest {
Assertions.assertEquals("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt",
azureProperties.validateAndGetUri(origProps));
azureProperties.setUsePathStyle("false");
- Assertions.assertEquals("s3://mystorageaccount/mycontainer/blob.txt",
+ Assertions.assertEquals("s3://mycontainer/blob.txt",
azureProperties.validateAndNormalizeUri("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt"));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java
new file mode 100644
index 00000000000..dc6eb8ad74c
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import
org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AzurePropertyUtilsTest {
+
+ // ---------- validateAndNormalizeUri Tests ----------
+
+ @Test
+ public void testWasbsUri() throws Exception {
+ String input =
"wasbs://[email protected]/data/file.txt";
+ String expected = "s3://container/data/file.txt";
+ Assertions.assertEquals(expected,
AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testS3Uri() throws Exception {
+ String input = "s3://container/data/file.txt";
+ String expected = "s3://container/data/file.txt";
+ Assertions.assertEquals(expected,
AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testAbfssUriWithoutPath() throws Exception {
+ String input = "abfss://[email protected]";
+ String expected = "s3://container";
+ Assertions.assertEquals(expected,
AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testHttpUri() throws Exception {
+ String input =
"http://account.blob.core.windows.net/container/folder/file.csv";
+ String expected = "s3://container/folder/file.csv";
+ Assertions.assertEquals(expected,
AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testHttpsUriWithoutPath() throws Exception {
+ String input = "https://account.blob.core.windows.net/container";
+ String expected = "s3://container";
+ Assertions.assertEquals(expected,
AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testHttpsUriWithPath() throws Exception {
+ String input =
"https://account.blob.core.windows.net/container/data/file.parquet";
+ String expected = "s3://container/data/file.parquet";
+ Assertions.assertEquals(expected,
AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testInvalidAzureScheme() {
+ String input =
"ftp://[email protected]/data/file.txt";
+ Assertions.assertThrows(StoragePropertiesException.class, () ->
+ AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testMissingAtInWasbUri() {
+ String input = "wasb://container/account.blob.core.windows.net/data";
+ Assertions.assertThrows(StoragePropertiesException.class, () ->
+ AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testHttpsUriMissingHost() {
+ String input = "https:///container/file.txt"; // missing host
+ Assertions.assertThrows(StoragePropertiesException.class, () ->
+ AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testHttpsUriNotAzureBlob() {
+ String input = "https://account.otherdomain.com/container/file.txt";
+ Assertions.assertThrows(StoragePropertiesException.class, () ->
+ AzurePropertyUtils.validateAndNormalizeUri(input));
+ }
+
+ @Test
+ public void testBlankUri() {
+ Assertions.assertThrows(StoragePropertiesException.class, () ->
+ AzurePropertyUtils.validateAndNormalizeUri(" "));
+ }
+
+ // ---------- validateAndGetUri Tests ----------
+
+ @Test
+ public void testValidateAndGetUriNormal() {
+ Map<String, String> props = new HashMap<>();
+ props.put("uri",
"wasbs://[email protected]/data/file.txt");
+
Assertions.assertEquals("wasbs://[email protected]/data/file.txt",
+ AzurePropertyUtils.validateAndGetUri(props));
+ }
+
+ @Test
+ public void testValidateAndGetUriCaseInsensitive() {
+ Map<String, String> props = new HashMap<>();
+ props.put("URI",
"wasbs://[email protected]/data/file.txt");
+
Assertions.assertEquals("wasbs://[email protected]/data/file.txt",
+ AzurePropertyUtils.validateAndGetUri(props));
+ }
+
+ @Test
+ public void testValidateAndGetUriMissing() {
+ Map<String, String> props = new HashMap<>();
+ props.put("path", "value");
+ Assertions.assertThrows(StoragePropertiesException.class, () ->
+ AzurePropertyUtils.validateAndGetUri(props));
+ }
+
+ @Test
+ public void testValidateAndGetUriEmptyMap() {
+ Assertions.assertThrows(StoragePropertiesException.class, () ->
+ AzurePropertyUtils.validateAndGetUri(new HashMap<>()));
+ }
+
+ @Test
+ public void testValidateAndGetUriNullMap() {
+ Assertions.assertThrows(StoragePropertiesException.class, () ->
+ AzurePropertyUtils.validateAndGetUri(null));
+ }
+}
diff --git a/fe/pom.xml b/fe/pom.xml
index 3569cd4e007..cf0bb46f3a3 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -280,6 +280,9 @@ under the License.
<!--The dependence of transitive dependence cannot be ruled out, only
Saving the nation through twisted ways.-->
<netty-3-test.version>3.10.6.Final</netty-3-test.version>
<objenesis.version>2.1</objenesis.version>
+ <!--ensure JDK 17 compatibility by removing dependency on internal
X509V1CertImpl class-->
+ <wildfly-openssl.version>2.2.5.Final</wildfly-openssl.version>
+ <wildfly-common.version>2.0.1</wildfly-common.version>
<!-- NOTE: Using grpc-java whose version is newer than 1.34.0 will
break the build on CentOS 6 due to the obsolete GLIBC -->
<grpc-java.version>1.34.0</grpc-java.version>
<!--Need to ensure that the version is the same as in
arrow/java/pom.xml or compatible with it.-->
@@ -1026,6 +1029,16 @@ under the License.
<artifactId>objenesis</artifactId>
<version>${objenesis.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.wildfly.openssl</groupId>
+ <artifactId>wildfly-openssl</artifactId>
+ <version>${wildfly-openssl.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wildfly.common</groupId>
+ <artifactId>wildfly-common</artifactId>
+ <version>${wildfly-common.version}</version>
+ </dependency>
<!--
https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
@@ -1192,6 +1205,11 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-azure</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]