This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6fd536bc937 branch-3.0: [fix](azure) Add FE azure object storage UT and fix log #46201 (#46244) 6fd536bc937 is described below commit 6fd536bc93772cc38fa14191d0ee39b91d87d0fd Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Jan 2 11:34:11 2025 +0800 branch-3.0: [fix](azure) Add FE azure object storage UT and fix log #46201 (#46244) Cherry-picked from #46201 Co-authored-by: Gavin Chou <ga...@selectdb.com> --- .../org/apache/doris/fs/obj/AzureObjStorage.java | 13 +- .../apache/doris/fs/obj/AzureObjStorageTest.java | 259 +++++++++++++++++++++ 2 files changed, 268 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java index fb76ed20a45..780d2ab9fa3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java @@ -295,7 +295,7 @@ public class AzureObjStorage implements ObjStorage<BlobServiceClient> { // It assumes the path starts with 'S3://${containerName}' // So here the path needs to be constructed in a format that BE can parse. private String constructS3Path(String fileName, String bucket) throws UserException { - LOG.info("the path is {}", String.format("s3://%s/%s", bucket, fileName)); + LOG.debug("the path is {}", String.format("s3://%s/%s", bucket, fileName)); return String.format("s3://%s/%s", bucket, fileName); } @@ -337,8 +337,7 @@ public class AzureObjStorage implements ObjStorage<BlobServiceClient> { String newContinuationToken = null; do { roundCnt++; - PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, newContinuationToken, null); - PagedResponse<BlobItem> pagedResponse = pagedBlobs.iterableByPage().iterator().next(); + PagedResponse<BlobItem> pagedResponse = getPagedBlobItems(client, options, newContinuationToken); for (BlobItem blobItem : pagedResponse.getElements()) { elementCnt++; @@ -371,9 +370,15 @@ public class AzureObjStorage implements ObjStorage<BlobServiceClient> { long endTime = System.nanoTime(); long duration = endTime - startTime; LOG.info("process {} elements under prefix {} for {} round, match {} elements, take {} micro second", - remotePath, elementCnt, matchCnt, roundCnt, + remotePath, elementCnt, roundCnt, matchCnt, duration / 1000); } return st; } + + public PagedResponse<BlobItem> getPagedBlobItems(BlobContainerClient client, ListBlobsOptions options, + String newContinuationToken) { + PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, newContinuationToken, null); + return pagedBlobs.iterableByPage().iterator().next(); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/AzureObjStorageTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/AzureObjStorageTest.java new file mode 100644 index 00000000000..41f49b7eab1 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/AzureObjStorageTest.java @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.obj; + +import org.apache.doris.backup.Status; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.fs.remote.RemoteFile; + +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpRequest; +import com.azure.core.http.rest.PagedResponse; +import com.azure.core.util.IterableStream; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.BlobItemProperties; +import com.azure.storage.blob.models.ListBlobsOptions; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.PathMatcher; +import java.nio.file.Paths; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class AzureObjStorageTest { + public static class I { + public String pattern; + public Long expectedMatchSize; + + public I(String pattern, long expectedMatchSize) { + this.pattern = pattern; + this.expectedMatchSize = expectedMatchSize; + } + } + + public static List<I> genInputs() { + List<I> inputs = new ArrayList<I>(); + inputs.add(new I("s3://gavin-test-jp/azure-test/1/*/tmp*", 8196L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/1/tmp*", 4098L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/1/*tmp*", 4098L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/1/**/tmp*", 20490L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/**/tmp*", 32784L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/*", 0L)); // no files at 1st level + inputs.add(new I("s3://gavin-test-jp/azure-test/2/*", 4098L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/2*/*", 4098L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/2/*I*", 591L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/1", 0L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/2", 0L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/3", 0L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/1/tmp.k*", 61L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/1/tmp.[a-z]*", 1722L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/[12]/tmp.[a-z]*", 3444L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/1/2/tmp.ehJi0Y5QKt", 1L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/non-existed-dir", 0L)); + inputs.add(new I("s3://gavin-test-jp/azure-test/1/2/non-existed-file", 0L)); + inputs.add(new I("s3://gavin-test-jp/azure_backup/__palo_repository_azure_repo1/__ss_*/*", 1L)); + + return inputs; + } + + // this case relies on real azure client and issue HTTP requests, ignore it by default + // run it if you have storage account(ak), key(sk) and container(bucket) + @Test + @Ignore + public void testGlobList() { + Map<String, String> props = new HashMap<String, String>(); + props.put(S3Properties.ACCESS_KEY, "${account_name}"); + props.put(S3Properties.SECRET_KEY, "${key}"); + props.put(S3Properties.ENDPOINT, "https://blob.azure.windows.net"); + props.put(S3Properties.BUCKET, "${container}"); + + List<I> inputs = genInputs(); + inputs.stream().forEach(i -> { + AzureObjStorage azs = new AzureObjStorage(props); + List<RemoteFile> result = new ArrayList<RemoteFile>(); + boolean fileNameOnly = false; + // FIXME(gavin): Mock the result returned from azure blob to make this UT work when no aksk and network + Status st = azs.globList(i.pattern, result, fileNameOnly); + Assertions.assertTrue(st.ok()); + Assertions.assertEquals(i.expectedMatchSize, result.size()); + }); + } + + @Test + public void testGlobListWithMockedAzureStorage() { + Map<String, String> props = new HashMap<String, String>(); + props.put(S3Properties.ACCESS_KEY, "gavintestmocked"); + props.put(S3Properties.SECRET_KEY, "sksks"); + props.put(S3Properties.ENDPOINT, "https://blob.azure.windows.net"); + props.put(S3Properties.BUCKET, "gavin-test-mocked"); + + List<I> inputs = genInputs(); + inputs.stream().forEach(i -> { + AzureObjStorage azs = genMockedAzureObjStorage(4/*numBatches, numContinuations*/); + List<RemoteFile> result = new ArrayList<RemoteFile>(); + boolean fileNameOnly = false; + // FIXME(gavin): Mock the result returned from azure blob to make this UT work when no aksk and network + Status st = azs.globList(i.pattern, result, fileNameOnly); + Assertions.assertTrue(st.ok()); + Assertions.assertEquals(i.expectedMatchSize, result.size()); + for (int j = 0; j < result.size() && j < 10; ++j) { + System.out.println(result.get(j).getName()); + } + System.out.println("pattern: " + i.pattern + " matched " + result.size()); + System.out.println("===================="); + }); + } + + @Test + public void testFsGlob() { + for (I i : genInputs()) { + String pattern = i.pattern.substring(19); // remove prefix s3://gavin-test-jp/ + PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pattern); + List<String> matchedPaths = new ArrayList<>(); + for (String p : genObjectKeys()) { + java.nio.file.Path blobPath = Paths.get(p); + if (!matcher.matches(blobPath)) { + continue; + } + matchedPaths.add(p); + } + Assertions.assertEquals(i.expectedMatchSize, matchedPaths.size()); + } + } + + public static List<String> genObjectKeys() { + String fileSuffixes = "zMkg8GtbSL MaSolJd8sL gtujCOzywm XBSnMwdoa9 FTxFgeihCa k9mp5K23pY dxjWuBC0dz osaFdxo2mz h4PEGleain aGxFstkIZz lGZbBe35uE gRILmhSPVm Ta3S5IUQiC 730eZqyhum XUhMhqk0DG cL7VKXMQuN DOb5J5MUFV i2dg8BSuTE FIzNOcoekf N24tUXQ8ws c31UcFGP1S S7bkoinC5u CCOKe8YL1N b5qGztFktP C7M1G8mYIP 0JA5yp6VAt lfJpMUYDe5 vXVOGgbZD3 UtM44M7c6F gPcli9bsQ4 cKP1TAjXfC v5k8Wksy9k UNkJtnIWwV 4UB0gbsI9g U4yKzgRbkG XcYaYUubBH bNVYZKq422 AvUoK2bcwS 5t6QDuG5ox Hj8lPHhsZL 08KZbcvC8P 5nYfmpu0Xj [...] + List<String> ret = new ArrayList<>(); + for (String i : fileSuffixes.split(" ")) { + ret.add("azure-test/1/tmp." + i); + ret.add("azure-test/1/2/tmp." + i); + ret.add("azure-test/1/2/1/tmp." + i); + ret.add("azure-test/1/2/2/tmp." + i); + ret.add("azure-test/1/2/3/tmp." + i); + ret.add("azure-test/1/3/tmp." + i); + ret.add("azure-test/2/tmp." + i); + ret.add("azure-test/3/tmp." + i); + } + ret.add("azure_backup/__palo_repository_azure_repo1/__ss_yyq/__meta.ff9b669c1505f51993d5fb448a345811"); + ret.sort(String::compareTo); + return ret; + } + + @Test + public void testMockObj() { + AzureObjStorage azs = genMockedAzureObjStorage(2); + List<RemoteFile> result = new ArrayList<RemoteFile>(); + boolean fileNameOnly = false; + Status st = azs.globList("s3://gavin-test-jp/azure-test/1/*", result, fileNameOnly); + Assertions.assertTrue(st.ok()); + // for (RemoteFile i : result) { + // System.out.println(i.getName()); + // } + } + + /** + * Mock an AzureObjStorage of which getPagedBlobItems() will return objects and split into multiple batches + * for testing continuations + * + * @param numBatch control how many PagedResponse will return when calling getPagedBlobItems() + * @return a mocked AzureObjStorage + */ + public static AzureObjStorage genMockedAzureObjStorage(int numBatch) { + Map<String, String> props = new HashMap<String, String>(); + props.put(S3Properties.ACCESS_KEY, "gavintestus"); + props.put(S3Properties.SECRET_KEY, "sksksksksksksk"); + props.put(S3Properties.ENDPOINT, "https://blob.azure.windows.net"); + props.put(S3Properties.BUCKET, "gavin-test-us"); + AzureObjStorage azs = new AzureObjStorage(props); + List<String> allBlobKeys = genObjectKeys(); + final Integer[] batchIndex = {0}; // from 0 to numBatch + new MockUp<AzureObjStorage>(AzureObjStorage.class) { + @Mock + public PagedResponse<BlobItem> getPagedBlobItems(BlobContainerClient client, ListBlobsOptions options, + String newContinuationToken) { + batchIndex[0]++; + return new PagedResponse<BlobItem>() { + public final List<List<String>> batches = new ArrayList<List<String>>() { + { + List<String> all = allBlobKeys.stream().filter(x -> x.startsWith(options.getPrefix())) + .collect(Collectors.toList()); + int numPerBatch = all.size() / numBatch; + int cnt = 0; + for (int i = 0; i < numBatch; ++i) { + List<String> batch = new ArrayList<>(); + for (int j = 0; j < numPerBatch; ++j) { + batch.add(all.get(cnt++)); + } + add(batch); + } + for (; cnt < all.size(); ++cnt) { // remainder + get(size() - 1).add(all.get(cnt)); + } + } + }; + + @Override + public IterableStream<BlobItem> getElements() { + List<BlobItem> blobItems = new ArrayList<>(); + batches.get(batchIndex[0] - 1).forEach(x -> blobItems.add(new BlobItem().setName(x).setIsPrefix(false) + .setProperties(new BlobItemProperties().setContentLength(17L) + .setLastModified(OffsetDateTime.now())))); + return new IterableStream<>(blobItems); + } + + @Override + public String getContinuationToken() { + return (batchIndex[0] >= numBatch) ? null : "what ever it is"; + } + + @Override + public int getStatusCode() { + return 200; // OK + } + + @Override + public HttpHeaders getHeaders() { + return null; + } + + @Override + public HttpRequest getRequest() { + return null; + } + + @Override + public void close() throws IOException { + + } + }; + } + }; + return azs; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org