This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 922c7c7639c591fde744fee532a0e0f977b2da5e Author: AlexYue <yj976240...@gmail.com> AuthorDate: Thu Jul 4 23:21:46 2024 +0800 [enhance](Azure) Use s3Uri to specify the object's bucket and key for azure in FE (#37308) Previously when using s3 load on azure blob storage, user should specify the s3.bucket property. But actually we can get the bucket information from the data infile uri. --- .../org/apache/doris/fs/obj/AzureObjStorage.java | 154 ++++++++++++++++++--- .../apache/doris/fs/remote/AzureFileSystem.java | 74 +--------- 2 files changed, 138 insertions(+), 90 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java index 2d55f7dd477..358b66b44b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java @@ -20,7 +20,10 @@ package org.apache.doris.fs.obj; import org.apache.doris.backup.Status; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.S3URI; +import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.fs.remote.RemoteFile; import com.azure.core.http.rest.PagedIterable; import com.azure.core.http.rest.PagedResponse; @@ -28,7 +31,8 @@ import com.azure.core.http.rest.Response; import com.azure.core.util.Context; import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobContainerClientBuilder; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; import com.azure.storage.blob.batch.BlobBatch; import com.azure.storage.blob.batch.BlobBatchClient; import com.azure.storage.blob.batch.BlobBatchClientBuilder; @@ -45,23 +49,41 @@ import org.jetbrains.annotations.Nullable; import java.io.File; import java.io.InputStream; +import java.nio.file.FileSystems; +import java.nio.file.PathMatcher; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; -public class AzureObjStorage implements ObjStorage<BlobContainerClient> { +public class AzureObjStorage implements ObjStorage<BlobServiceClient> { private static final Logger LOG = LogManager.getLogger(AzureObjStorage.class); + private static final String URI_TEMPLATE = "https://%s.blob.core.windows.net"; protected Map<String, String> properties; - private BlobContainerClient client; + private BlobServiceClient client; + private boolean isUsePathStyle = false; - private static final String URI_TEMPLATE = "https://%s.blob.core.windows.net/%s"; + private boolean forceParsingByStandardUri = false; public AzureObjStorage(Map<String, String> properties) { this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); setProperties(properties); } + // To ensure compatibility with S3 usage, the path passed by the user still starts with 'S3://${containerName}'. + // For Azure, we need to remove this part. + private static String removeUselessSchema(String remotePath) { + String prefix = "s3://"; + + if (remotePath.startsWith(prefix)) { + remotePath = remotePath.substring(prefix.length()); + } + // Remove the useless container name + int firstSlashIndex = remotePath.indexOf('/'); + return remotePath.substring(firstSlashIndex + 1); + } + public Map<String, String> getProperties() { return properties; } @@ -73,16 +95,29 @@ public class AzureObjStorage implements ObjStorage<BlobContainerClient> { } catch (DdlException e) { throw new IllegalArgumentException(e); } + // Virtual hosted-style is recommended in the s3 protocol. + // The path-style has been abandoned, but for some unexplainable reasons, + // the s3 client will determine whether the endpiont starts with `s3` + // when generating a virtual hosted-sytle request. + // If not, it will not be converted ( https://github.com/aws/aws-sdk-java-v2/pull/763), + // but the endpoints of many cloud service providers for object storage do not start with s3, + // so they cannot be converted to virtual hosted-sytle. + // Some of them, such as aliyun's oss, only support virtual hosted-style, + // and some of them(ceph) may only support + // path-style, so we need to do some additional conversion. + isUsePathStyle = this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false") + .equalsIgnoreCase("true"); + forceParsingByStandardUri = this.properties.getOrDefault(PropertyConverter.FORCE_PARSING_BY_STANDARD_URI, + "false").equalsIgnoreCase("true"); } @Override - public BlobContainerClient getClient() throws UserException { + public BlobServiceClient getClient() throws UserException { if (client == null) { - String containerName = properties.get(S3Properties.BUCKET); - String uri = String.format(URI_TEMPLATE, properties.get(S3Properties.ACCESS_KEY), containerName); + String uri = String.format(URI_TEMPLATE, properties.get(S3Properties.ACCESS_KEY)); StorageSharedKeyCredential cred = new StorageSharedKeyCredential(properties.get(S3Properties.ACCESS_KEY), properties.get(S3Properties.SECRET_KEY)); - BlobContainerClientBuilder builder = new BlobContainerClientBuilder(); + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); builder.credential(cred); builder.endpoint(uri); client = builder.buildClient(); @@ -97,8 +132,9 @@ public class AzureObjStorage implements ObjStorage<BlobContainerClient> { @Override public Status headObject(String remotePath) { - BlobClient blobClient = client.getBlobClient(remotePath); try { + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); + BlobClient blobClient = getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey()); BlobProperties properties = blobClient.getProperties(); LOG.info("head file {} success: {}", remotePath, properties.toString()); return Status.OK; @@ -110,13 +146,17 @@ public class AzureObjStorage implements ObjStorage<BlobContainerClient> { return new Status(Status.ErrCode.COMMON_ERROR, "headObject " + remotePath + " failed: " + e.getMessage()); } + } catch (UserException e) { + return new Status(Status.ErrCode.COMMON_ERROR, "headObject " + + remotePath + " failed: " + e.getMessage()); } } @Override public Status getObject(String remoteFilePath, File localFile) { try { - BlobClient blobClient = client.getBlobClient(remoteFilePath); + S3URI uri = S3URI.create(remoteFilePath, isUsePathStyle, forceParsingByStandardUri); + BlobClient blobClient = getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey()); BlobProperties properties = blobClient.downloadToFile(localFile.getAbsolutePath()); LOG.info("get file " + remoteFilePath + " success: " + properties.toString()); return Status.OK; @@ -124,26 +164,34 @@ public class AzureObjStorage implements ObjStorage<BlobContainerClient> { return new Status( Status.ErrCode.COMMON_ERROR, "get file from azure error: " + e.getServiceMessage()); + } catch (UserException e) { + return new Status(Status.ErrCode.COMMON_ERROR, "getObject " + + remoteFilePath + " failed: " + e.getMessage()); } } @Override public Status putObject(String remotePath, @Nullable InputStream content, long contentLength) { try { - BlobClient blobClient = client.getBlobClient(remotePath); + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); + BlobClient blobClient = getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey()); blobClient.upload(content, contentLength); return Status.OK; } catch (BlobStorageException e) { return new Status( Status.ErrCode.COMMON_ERROR, "Error occurred while copying the blob:: " + e.getServiceMessage()); + } catch (UserException e) { + return new Status(Status.ErrCode.COMMON_ERROR, "putObject " + + remotePath + " failed: " + e.getMessage()); } } @Override public Status deleteObject(String remotePath) { try { - BlobClient blobClient = client.getBlobClient(remotePath); + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); + BlobClient blobClient = getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey()); blobClient.delete(); LOG.info("delete file " + remotePath + " success"); return Status.OK; @@ -151,12 +199,18 @@ public class AzureObjStorage implements ObjStorage<BlobContainerClient> { return new Status( Status.ErrCode.COMMON_ERROR, "get file from azure error: " + e.getServiceMessage()); + } catch (UserException e) { + return new Status(Status.ErrCode.COMMON_ERROR, "deleteObject " + + remotePath + " failed: " + e.getMessage()); } } @Override public Status deleteObjects(String remotePath) { try { + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); + BlobContainerClient blobClient = getClient().getBlobContainerClient(uri.getBucket()); + String containerUrl = blobClient.getBlobContainerUrl(); String continuationToken = ""; boolean isTruncated = false; long totalObjects = 0; @@ -165,11 +219,11 @@ public class AzureObjStorage implements ObjStorage<BlobContainerClient> { List<RemoteObject> objectList = objects.getObjectList(); if (!objectList.isEmpty()) { BlobBatchClient blobBatchClient = new BlobBatchClientBuilder( - client.getServiceClient()).buildClient(); + getClient()).buildClient(); BlobBatch blobBatch = blobBatchClient.getBlobBatch(); for (RemoteObject blob : objectList) { - blobBatch.deleteBlob(client.getBlobContainerUrl(), blob.getKey()); + blobBatch.deleteBlob(containerUrl, blob.getKey()); } Response<Void> resp = blobBatchClient.submitBatchWithResponse(blobBatch, true, null, Context.NONE); LOG.info("{} objects deleted for dir {} return http code {}", @@ -193,15 +247,22 @@ public class AzureObjStorage implements ObjStorage<BlobContainerClient> { @Override public Status copyObject(String origFilePath, String destFilePath) { try { - BlobClient sourceBlobClient = client.getBlobClient(origFilePath); - BlobClient destinationBlobClient = client.getBlobClient(destFilePath); + S3URI origUri = S3URI.create(origFilePath, isUsePathStyle, forceParsingByStandardUri); + S3URI destUri = S3URI.create(destFilePath, isUsePathStyle, forceParsingByStandardUri); + BlobClient sourceBlobClient = getClient().getBlobContainerClient(origUri.getBucket()) + .getBlobClient(origUri.getKey()); + BlobClient destinationBlobClient = getClient().getBlobContainerClient(destUri.getBucket()) + .getBlobClient(destUri.getKey()); destinationBlobClient.beginCopy(sourceBlobClient.getBlobUrl(), null); - System.out.println("Blob copied from " + origFilePath + " to " + destFilePath); + LOG.info("Blob copied from " + origFilePath + " to " + destFilePath); return Status.OK; } catch (BlobStorageException e) { return new Status( Status.ErrCode.COMMON_ERROR, "Error occurred while copying the blob:: " + e.getServiceMessage()); + } catch (UserException e) { + return new Status(Status.ErrCode.COMMON_ERROR, "copyObject from " + + origFilePath + "to " + destFilePath + " failed: " + e.getMessage()); } } @@ -209,7 +270,9 @@ public class AzureObjStorage implements ObjStorage<BlobContainerClient> { public RemoteObjects listObjects(String remotePath, String continuationToken) throws DdlException { try { ListBlobsOptions options = new ListBlobsOptions().setPrefix(remotePath); - PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, continuationToken, null); + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); + PagedIterable<BlobItem> pagedBlobs = getClient().getBlobContainerClient(uri.getBucket()) + .listBlobs(options, continuationToken, null); PagedResponse<BlobItem> pagedResponse = pagedBlobs.iterableByPage().iterator().next(); List<RemoteObject> remoteObjects = new ArrayList<>(); @@ -222,6 +285,61 @@ public class AzureObjStorage implements ObjStorage<BlobContainerClient> { } catch (BlobStorageException e) { LOG.warn(String.format("Failed to list objects for S3: %s", remotePath), e); throw new DdlException("Failed to list objects for S3, Error message: " + e.getMessage(), e); + } catch (UserException e) { + LOG.warn(String.format("Failed to list objects for S3: %s", remotePath), e); + throw new DdlException("Failed to list objects for S3, Error message: " + e.getMessage(), e); + } + } + + // Due to historical reasons, when the BE parses the object storage path. + // It assumes the path starts with 'S3://${containerName}' + // So here the path needs to be constructed in a format that BE can parse. + private String constructS3Path(String fileName, String bucket) throws UserException { + LOG.info("the path is {}", String.format("s3://%s/%s", bucket, fileName)); + return String.format("s3://%s/%s", bucket, fileName); + } + + public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { + try { + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); + String globPath = uri.getKey(); + LOG.info("try to glob list for azure, remote path {}, orig {}", globPath, remotePath); + BlobContainerClient client = getClient().getBlobContainerClient(uri.getBucket()); + java.nio.file.Path pathPattern = Paths.get(globPath); + LOG.info("path pattern {}", pathPattern.toString()); + PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString()); + + ListBlobsOptions options = new ListBlobsOptions().setPrefix(globPath); + String newContinuationToken = null; + do { + PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, newContinuationToken, null); + PagedResponse<BlobItem> pagedResponse = pagedBlobs.iterableByPage().iterator().next(); + + for (BlobItem blobItem : pagedResponse.getElements()) { + java.nio.file.Path blobPath = Paths.get(blobItem.getName()); + + if (matcher.matches(blobPath)) { + RemoteFile remoteFile = new RemoteFile( + fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString(), + uri.getBucket()), + !blobItem.isPrefix(), + blobItem.isPrefix() ? -1 : blobItem.getProperties().getContentLength(), + blobItem.getProperties().getContentLength(), + blobItem.getProperties().getLastModified().getSecond()); + result.add(remoteFile); + } + } + newContinuationToken = pagedResponse.getContinuationToken(); + } while (newContinuationToken != null); + + } catch (BlobStorageException e) { + LOG.warn("glob file " + remotePath + " failed because azure error: " + e.getMessage()); + return new Status(Status.ErrCode.COMMON_ERROR, "glob file " + remotePath + + " failed because azure error: " + e.getMessage()); + } catch (Exception e) { + LOG.warn("errors while glob file " + remotePath, e); + return new Status(Status.ErrCode.COMMON_ERROR, "errors while glob file " + remotePath + e.getMessage()); } + return Status.OK; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java index 39dd9c2fe42..5004cfd2f12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java @@ -23,20 +23,11 @@ import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; import org.apache.doris.fs.obj.AzureObjStorage; -import com.azure.core.http.rest.PagedIterable; -import com.azure.core.http.rest.PagedResponse; -import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.models.BlobItem; -import com.azure.storage.blob.models.BlobStorageException; -import com.azure.storage.blob.models.ListBlobsOptions; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.nio.file.FileSystems; -import java.nio.file.PathMatcher; -import java.nio.file.Paths; import java.util.List; import java.util.Map; @@ -63,70 +54,9 @@ public class AzureFileSystem extends ObjFileSystem { return null; } - // To ensure compatibility with S3 usage, the path passed by the user still starts with 'S3://${containerName}'. - // For Azure, we need to remove this part. - private static String removeUselessSchema(String remotePath) { - String prefix = "s3://"; - - if (remotePath.startsWith(prefix)) { - remotePath = remotePath.substring(prefix.length()); - } - // Remove the useless container name - int firstSlashIndex = remotePath.indexOf('/'); - return remotePath.substring(firstSlashIndex + 1); - } - - // Due to historical reasons, when the BE parses the object storage path. - // It assumes the path starts with 'S3://${containerName}' - // So here the path needs to be constructed in a format that BE can parse. - private String constructS3Path(String fileName) throws UserException { - BlobContainerClient client = (BlobContainerClient) getObjStorage().getClient(); - String bucket = client.getBlobContainerName(); - LOG.info("the path is {}", String.format("s3://%s/%s", bucket, fileName)); - return String.format("s3://%s/%s", bucket, fileName); - } - @Override public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { - String copyPath = new String(remotePath); - copyPath = removeUselessSchema(copyPath); - LOG.info("try to glob list for azure, remote path {}, orig {}", copyPath, remotePath); - try { - BlobContainerClient client = (BlobContainerClient) getObjStorage().getClient(); - java.nio.file.Path pathPattern = Paths.get(copyPath); - LOG.info("path pattern {}", pathPattern.toString()); - PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString()); - - ListBlobsOptions options = new ListBlobsOptions().setPrefix(copyPath); - String newContinuationToken = null; - do { - PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, newContinuationToken, null); - PagedResponse<BlobItem> pagedResponse = pagedBlobs.iterableByPage().iterator().next(); - - for (BlobItem blobItem : pagedResponse.getElements()) { - java.nio.file.Path blobPath = Paths.get(blobItem.getName()); - - if (matcher.matches(blobPath)) { - RemoteFile remoteFile = new RemoteFile( - fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString()), - !blobItem.isPrefix(), - blobItem.isPrefix() ? -1 : blobItem.getProperties().getContentLength(), - blobItem.getProperties().getContentLength(), - blobItem.getProperties().getLastModified().getSecond()); - result.add(remoteFile); - } - } - newContinuationToken = pagedResponse.getContinuationToken(); - } while (newContinuationToken != null); - - } catch (BlobStorageException e) { - LOG.warn("glob file " + remotePath + " failed because azure error: " + e.getMessage()); - return new Status(Status.ErrCode.COMMON_ERROR, "glob file " + remotePath - + " failed because azure error: " + e.getMessage()); - } catch (Exception e) { - LOG.warn("errors while glob file " + remotePath, e); - return new Status(Status.ErrCode.COMMON_ERROR, "errors while glob file " + remotePath + e.getMessage()); - } - return Status.OK; + AzureObjStorage azureObjStorage = (AzureObjStorage) getObjStorage(); + return azureObjStorage.globList(remotePath, result, fileNameOnly); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org