This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 922c7c7639c591fde744fee532a0e0f977b2da5e
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Thu Jul 4 23:21:46 2024 +0800

    [enhance](Azure) Use s3Uri to specify the object's bucket and key for azure 
in FE (#37308)
    
    Previously when using s3 load on azure blob storage, user should specify
    the s3.bucket property. But actually we can get the bucket information
    from the data infile uri.
---
 .../org/apache/doris/fs/obj/AzureObjStorage.java   | 154 ++++++++++++++++++---
 .../apache/doris/fs/remote/AzureFileSystem.java    |  74 +---------
 2 files changed, 138 insertions(+), 90 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
index 2d55f7dd477..358b66b44b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java
@@ -20,7 +20,10 @@ package org.apache.doris.fs.obj;
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.S3URI;
+import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.fs.remote.RemoteFile;
 
 import com.azure.core.http.rest.PagedIterable;
 import com.azure.core.http.rest.PagedResponse;
@@ -28,7 +31,8 @@ import com.azure.core.http.rest.Response;
 import com.azure.core.util.Context;
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobContainerClientBuilder;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
 import com.azure.storage.blob.batch.BlobBatch;
 import com.azure.storage.blob.batch.BlobBatchClient;
 import com.azure.storage.blob.batch.BlobBatchClientBuilder;
@@ -45,23 +49,41 @@ import org.jetbrains.annotations.Nullable;
 
 import java.io.File;
 import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class AzureObjStorage implements ObjStorage<BlobContainerClient> {
+public class AzureObjStorage implements ObjStorage<BlobServiceClient> {
     private static final Logger LOG = 
LogManager.getLogger(AzureObjStorage.class);
+    private static final String URI_TEMPLATE = 
"https://%s.blob.core.windows.net";;
     protected Map<String, String> properties;
-    private BlobContainerClient client;
+    private BlobServiceClient client;
+    private boolean isUsePathStyle = false;
 
-    private static final String URI_TEMPLATE = 
"https://%s.blob.core.windows.net/%s";;
+    private boolean forceParsingByStandardUri = false;
 
     public AzureObjStorage(Map<String, String> properties) {
         this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
         setProperties(properties);
     }
 
+    // To ensure compatibility with S3 usage, the path passed by the user 
still starts with 'S3://${containerName}'.
+    // For Azure, we need to remove this part.
+    private static String removeUselessSchema(String remotePath) {
+        String prefix = "s3://";
+
+        if (remotePath.startsWith(prefix)) {
+            remotePath = remotePath.substring(prefix.length());
+        }
+        // Remove the useless container name
+        int firstSlashIndex = remotePath.indexOf('/');
+        return remotePath.substring(firstSlashIndex + 1);
+    }
+
     public Map<String, String> getProperties() {
         return properties;
     }
@@ -73,16 +95,29 @@ public class AzureObjStorage implements 
ObjStorage<BlobContainerClient> {
         } catch (DdlException e) {
             throw new IllegalArgumentException(e);
         }
+        // Virtual hosted-style is recommended in the s3 protocol.
+        // The path-style has been abandoned, but for some unexplainable 
reasons,
+        // the s3 client will determine whether the endpiont starts with `s3`
+        // when generating a virtual hosted-sytle request.
+        // If not, it will not be converted ( 
https://github.com/aws/aws-sdk-java-v2/pull/763),
+        // but the endpoints of many cloud service providers for object 
storage do not start with s3,
+        // so they cannot be converted to virtual hosted-sytle.
+        // Some of them, such as aliyun's oss, only support virtual 
hosted-style,
+        // and some of them(ceph) may only support
+        // path-style, so we need to do some additional conversion.
+        isUsePathStyle = 
this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false")
+                .equalsIgnoreCase("true");
+        forceParsingByStandardUri = 
this.properties.getOrDefault(PropertyConverter.FORCE_PARSING_BY_STANDARD_URI,
+                "false").equalsIgnoreCase("true");
     }
 
     @Override
-    public BlobContainerClient getClient() throws UserException {
+    public BlobServiceClient getClient() throws UserException {
         if (client == null) {
-            String containerName = properties.get(S3Properties.BUCKET);
-            String uri = String.format(URI_TEMPLATE, 
properties.get(S3Properties.ACCESS_KEY), containerName);
+            String uri = String.format(URI_TEMPLATE, 
properties.get(S3Properties.ACCESS_KEY));
             StorageSharedKeyCredential cred = new 
StorageSharedKeyCredential(properties.get(S3Properties.ACCESS_KEY),
                     properties.get(S3Properties.SECRET_KEY));
-            BlobContainerClientBuilder builder = new 
BlobContainerClientBuilder();
+            BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
             builder.credential(cred);
             builder.endpoint(uri);
             client = builder.buildClient();
@@ -97,8 +132,9 @@ public class AzureObjStorage implements 
ObjStorage<BlobContainerClient> {
 
     @Override
     public Status headObject(String remotePath) {
-        BlobClient blobClient = client.getBlobClient(remotePath);
         try {
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            BlobClient blobClient = 
getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
             BlobProperties properties = blobClient.getProperties();
             LOG.info("head file {} success: {}", remotePath, 
properties.toString());
             return Status.OK;
@@ -110,13 +146,17 @@ public class AzureObjStorage implements 
ObjStorage<BlobContainerClient> {
                 return new Status(Status.ErrCode.COMMON_ERROR, "headObject "
                         + remotePath + " failed: " + e.getMessage());
             }
+        } catch (UserException e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, "headObject "
+                    + remotePath + " failed: " + e.getMessage());
         }
     }
 
     @Override
     public Status getObject(String remoteFilePath, File localFile) {
         try {
-            BlobClient blobClient = client.getBlobClient(remoteFilePath);
+            S3URI uri = S3URI.create(remoteFilePath, isUsePathStyle, 
forceParsingByStandardUri);
+            BlobClient blobClient = 
getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
             BlobProperties properties = 
blobClient.downloadToFile(localFile.getAbsolutePath());
             LOG.info("get file " + remoteFilePath + " success: " + 
properties.toString());
             return Status.OK;
@@ -124,26 +164,34 @@ public class AzureObjStorage implements 
ObjStorage<BlobContainerClient> {
             return new Status(
                     Status.ErrCode.COMMON_ERROR,
                     "get file from azure error: " + e.getServiceMessage());
+        } catch (UserException e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, "getObject "
+                    + remoteFilePath + " failed: " + e.getMessage());
         }
     }
 
     @Override
     public Status putObject(String remotePath, @Nullable InputStream content, 
long contentLength) {
         try {
-            BlobClient blobClient = client.getBlobClient(remotePath);
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            BlobClient blobClient = 
getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
             blobClient.upload(content, contentLength);
             return Status.OK;
         } catch (BlobStorageException e) {
             return new Status(
                     Status.ErrCode.COMMON_ERROR,
                     "Error occurred while copying the blob:: " + 
e.getServiceMessage());
+        } catch (UserException e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, "putObject "
+                    + remotePath + " failed: " + e.getMessage());
         }
     }
 
     @Override
     public Status deleteObject(String remotePath) {
         try {
-            BlobClient blobClient = client.getBlobClient(remotePath);
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            BlobClient blobClient = 
getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
             blobClient.delete();
             LOG.info("delete file " + remotePath + " success");
             return Status.OK;
@@ -151,12 +199,18 @@ public class AzureObjStorage implements 
ObjStorage<BlobContainerClient> {
             return new Status(
                     Status.ErrCode.COMMON_ERROR,
                     "get file from azure error: " + e.getServiceMessage());
+        } catch (UserException e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, "deleteObject "
+                    + remotePath + " failed: " + e.getMessage());
         }
     }
 
     @Override
     public Status deleteObjects(String remotePath) {
         try {
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            BlobContainerClient blobClient = 
getClient().getBlobContainerClient(uri.getBucket());
+            String containerUrl = blobClient.getBlobContainerUrl();
             String continuationToken = "";
             boolean isTruncated = false;
             long totalObjects = 0;
@@ -165,11 +219,11 @@ public class AzureObjStorage implements 
ObjStorage<BlobContainerClient> {
                 List<RemoteObject> objectList = objects.getObjectList();
                 if (!objectList.isEmpty()) {
                     BlobBatchClient blobBatchClient = new 
BlobBatchClientBuilder(
-                            client.getServiceClient()).buildClient();
+                            getClient()).buildClient();
                     BlobBatch blobBatch = blobBatchClient.getBlobBatch();
 
                     for (RemoteObject blob : objectList) {
-                        blobBatch.deleteBlob(client.getBlobContainerUrl(), 
blob.getKey());
+                        blobBatch.deleteBlob(containerUrl, blob.getKey());
                     }
                     Response<Void> resp = 
blobBatchClient.submitBatchWithResponse(blobBatch, true, null, Context.NONE);
                     LOG.info("{} objects deleted for dir {} return http code 
{}",
@@ -193,15 +247,22 @@ public class AzureObjStorage implements 
ObjStorage<BlobContainerClient> {
     @Override
     public Status copyObject(String origFilePath, String destFilePath) {
         try {
-            BlobClient sourceBlobClient = client.getBlobClient(origFilePath);
-            BlobClient destinationBlobClient = 
client.getBlobClient(destFilePath);
+            S3URI origUri = S3URI.create(origFilePath, isUsePathStyle, 
forceParsingByStandardUri);
+            S3URI destUri = S3URI.create(destFilePath, isUsePathStyle, 
forceParsingByStandardUri);
+            BlobClient sourceBlobClient = 
getClient().getBlobContainerClient(origUri.getBucket())
+                    .getBlobClient(origUri.getKey());
+            BlobClient destinationBlobClient = 
getClient().getBlobContainerClient(destUri.getBucket())
+                    .getBlobClient(destUri.getKey());
             destinationBlobClient.beginCopy(sourceBlobClient.getBlobUrl(), 
null);
-            System.out.println("Blob copied from " + origFilePath + " to " + 
destFilePath);
+            LOG.info("Blob copied from " + origFilePath + " to " + 
destFilePath);
             return Status.OK;
         } catch (BlobStorageException e) {
             return new Status(
                     Status.ErrCode.COMMON_ERROR,
                     "Error occurred while copying the blob:: " + 
e.getServiceMessage());
+        } catch (UserException e) {
+            return new Status(Status.ErrCode.COMMON_ERROR, "copyObject from "
+                    + origFilePath + "to " + destFilePath + " failed: " + 
e.getMessage());
         }
     }
 
@@ -209,7 +270,9 @@ public class AzureObjStorage implements 
ObjStorage<BlobContainerClient> {
     public RemoteObjects listObjects(String remotePath, String 
continuationToken) throws DdlException {
         try {
             ListBlobsOptions options = new 
ListBlobsOptions().setPrefix(remotePath);
-            PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, 
continuationToken, null);
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            PagedIterable<BlobItem> pagedBlobs = 
getClient().getBlobContainerClient(uri.getBucket())
+                    .listBlobs(options, continuationToken, null);
             PagedResponse<BlobItem> pagedResponse = 
pagedBlobs.iterableByPage().iterator().next();
             List<RemoteObject> remoteObjects = new ArrayList<>();
 
@@ -222,6 +285,61 @@ public class AzureObjStorage implements 
ObjStorage<BlobContainerClient> {
         } catch (BlobStorageException e) {
             LOG.warn(String.format("Failed to list objects for S3: %s", 
remotePath), e);
             throw new DdlException("Failed to list objects for S3, Error 
message: " + e.getMessage(), e);
+        } catch (UserException e) {
+            LOG.warn(String.format("Failed to list objects for S3: %s", 
remotePath), e);
+            throw new DdlException("Failed to list objects for S3, Error 
message: " + e.getMessage(), e);
+        }
+    }
+
+    // Due to historical reasons, when the BE parses the object storage path.
+    // It assumes the path starts with 'S3://${containerName}'
+    // So here the path needs to be constructed in a format that BE can parse.
+    private String constructS3Path(String fileName, String bucket) throws 
UserException {
+        LOG.info("the path is {}", String.format("s3://%s/%s", bucket, 
fileName));
+        return String.format("s3://%s/%s", bucket, fileName);
+    }
+
+    public Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
+        try {
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            String globPath = uri.getKey();
+            LOG.info("try to glob list for azure, remote path {}, orig {}", 
globPath, remotePath);
+            BlobContainerClient client = 
getClient().getBlobContainerClient(uri.getBucket());
+            java.nio.file.Path pathPattern = Paths.get(globPath);
+            LOG.info("path pattern {}", pathPattern.toString());
+            PathMatcher matcher = 
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString());
+
+            ListBlobsOptions options = new 
ListBlobsOptions().setPrefix(globPath);
+            String newContinuationToken = null;
+            do {
+                PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, 
newContinuationToken, null);
+                PagedResponse<BlobItem> pagedResponse = 
pagedBlobs.iterableByPage().iterator().next();
+
+                for (BlobItem blobItem : pagedResponse.getElements()) {
+                    java.nio.file.Path blobPath = 
Paths.get(blobItem.getName());
+
+                    if (matcher.matches(blobPath)) {
+                        RemoteFile remoteFile = new RemoteFile(
+                                fileNameOnly ? 
blobPath.getFileName().toString() : constructS3Path(blobPath.toString(),
+                                        uri.getBucket()),
+                                !blobItem.isPrefix(),
+                                blobItem.isPrefix() ? -1 : 
blobItem.getProperties().getContentLength(),
+                                blobItem.getProperties().getContentLength(),
+                                
blobItem.getProperties().getLastModified().getSecond());
+                        result.add(remoteFile);
+                    }
+                }
+                newContinuationToken = pagedResponse.getContinuationToken();
+            } while (newContinuationToken != null);
+
+        } catch (BlobStorageException e) {
+            LOG.warn("glob file " + remotePath + " failed because azure error: 
" + e.getMessage());
+            return new Status(Status.ErrCode.COMMON_ERROR, "glob file " + 
remotePath
+                    + " failed because azure error: " + e.getMessage());
+        } catch (Exception e) {
+            LOG.warn("errors while glob file " + remotePath, e);
+            return new Status(Status.ErrCode.COMMON_ERROR, "errors while glob 
file " + remotePath + e.getMessage());
         }
+        return Status.OK;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
index 39dd9c2fe42..5004cfd2f12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java
@@ -23,20 +23,11 @@ import org.apache.doris.backup.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.fs.obj.AzureObjStorage;
 
-import com.azure.core.http.rest.PagedIterable;
-import com.azure.core.http.rest.PagedResponse;
-import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.models.BlobItem;
-import com.azure.storage.blob.models.BlobStorageException;
-import com.azure.storage.blob.models.ListBlobsOptions;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.nio.file.FileSystems;
-import java.nio.file.PathMatcher;
-import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 
@@ -63,70 +54,9 @@ public class AzureFileSystem extends ObjFileSystem {
         return null;
     }
 
-    // To ensure compatibility with S3 usage, the path passed by the user 
still starts with 'S3://${containerName}'.
-    // For Azure, we need to remove this part.
-    private static String removeUselessSchema(String remotePath) {
-        String prefix = "s3://";
-
-        if (remotePath.startsWith(prefix)) {
-            remotePath = remotePath.substring(prefix.length());
-        }
-        // Remove the useless container name
-        int firstSlashIndex = remotePath.indexOf('/');
-        return remotePath.substring(firstSlashIndex + 1);
-    }
-
-    // Due to historical reasons, when the BE parses the object storage path.
-    // It assumes the path starts with 'S3://${containerName}'
-    // So here the path needs to be constructed in a format that BE can parse.
-    private String constructS3Path(String fileName) throws UserException {
-        BlobContainerClient client = (BlobContainerClient) 
getObjStorage().getClient();
-        String bucket = client.getBlobContainerName();
-        LOG.info("the path is {}", String.format("s3://%s/%s", bucket, 
fileName));
-        return String.format("s3://%s/%s", bucket, fileName);
-    }
-
     @Override
     public Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
-        String copyPath = new String(remotePath);
-        copyPath = removeUselessSchema(copyPath);
-        LOG.info("try to glob list for azure, remote path {}, orig {}", 
copyPath, remotePath);
-        try {
-            BlobContainerClient client = (BlobContainerClient) 
getObjStorage().getClient();
-            java.nio.file.Path pathPattern = Paths.get(copyPath);
-            LOG.info("path pattern {}", pathPattern.toString());
-            PathMatcher matcher = 
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString());
-
-            ListBlobsOptions options = new 
ListBlobsOptions().setPrefix(copyPath);
-            String newContinuationToken = null;
-            do {
-                PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, 
newContinuationToken, null);
-                PagedResponse<BlobItem> pagedResponse = 
pagedBlobs.iterableByPage().iterator().next();
-
-                for (BlobItem blobItem : pagedResponse.getElements()) {
-                    java.nio.file.Path blobPath = 
Paths.get(blobItem.getName());
-
-                    if (matcher.matches(blobPath)) {
-                        RemoteFile remoteFile = new RemoteFile(
-                                fileNameOnly ? 
blobPath.getFileName().toString() : constructS3Path(blobPath.toString()),
-                                !blobItem.isPrefix(),
-                                blobItem.isPrefix() ? -1 : 
blobItem.getProperties().getContentLength(),
-                                blobItem.getProperties().getContentLength(),
-                                
blobItem.getProperties().getLastModified().getSecond());
-                        result.add(remoteFile);
-                    }
-                }
-                newContinuationToken = pagedResponse.getContinuationToken();
-            } while (newContinuationToken != null);
-
-        } catch (BlobStorageException e) {
-            LOG.warn("glob file " + remotePath + " failed because azure error: 
" + e.getMessage());
-            return new Status(Status.ErrCode.COMMON_ERROR, "glob file " + 
remotePath
-                    + " failed because azure error: " + e.getMessage());
-        } catch (Exception e) {
-            LOG.warn("errors while glob file " + remotePath, e);
-            return new Status(Status.ErrCode.COMMON_ERROR, "errors while glob 
file " + remotePath + e.getMessage());
-        }
-        return Status.OK;
+        AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage();
+        return azureObjStorage.globList(remotePath, result, fileNameOnly);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to