This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push: new b9329d27d8 Better test coverage - usage, examples, operations b9329d27d8 is described below commit b9329d27d8f9ed1b8f64420d6df9eb04cea9c455 Author: JiriOndrusek <ondrusek.j...@gmail.com> AuthorDate: Fri Jun 20 07:59:54 2025 +0200 Better test coverage - usage, examples, operations Fixes #7445 --- .../deployment/AzureCoreSupportProcessor.java | 17 ++ .../deployment/AzureStorageDatalakeProcessor.java | 8 +- .../azure/azure-storage-datalake/pom.xml | 39 +++ .../datalake/it/AzureStorageDatalakeResource.java | 62 ++++ .../datalake/it/AzureStorageDatalakeRoutes.java | 162 +++++++++++ .../datalake/it/AzureStorageDatalakeUtil.java | 0 .../datalake/it/AzureStorageDatalakeTest.java | 311 ++++++++++++++++++++- .../it/AzureStorageDatalakeTestResource.java | 33 ++- integration-tests/azure-grouped/pom.xml | 17 ++ 9 files changed, 640 insertions(+), 9 deletions(-) diff --git a/extensions-support/azure-core/deployment/src/main/java/org/apache/camel/quarkus/support/reactor/netty/deployment/AzureCoreSupportProcessor.java b/extensions-support/azure-core/deployment/src/main/java/org/apache/camel/quarkus/support/reactor/netty/deployment/AzureCoreSupportProcessor.java index b603a2a2c9..6b8fadbe25 100644 --- a/extensions-support/azure-core/deployment/src/main/java/org/apache/camel/quarkus/support/reactor/netty/deployment/AzureCoreSupportProcessor.java +++ b/extensions-support/azure-core/deployment/src/main/java/org/apache/camel/quarkus/support/reactor/netty/deployment/AzureCoreSupportProcessor.java @@ -17,6 +17,7 @@ package org.apache.camel.quarkus.support.reactor.netty.deployment; import java.io.IOException; +import java.util.LinkedHashSet; import java.util.Set; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; @@ -25,6 +26,8 @@ import java.util.stream.Stream; import com.azure.core.annotation.ServiceInterface; import com.azure.core.exception.HttpResponseException; import com.azure.core.http.HttpClientProvider; +import com.azure.json.JsonSerializable; +import com.azure.xml.XmlSerializable; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; @@ -71,6 +74,20 @@ public class AzureCoreSupportProcessor { reflectiveClasses.produce(ReflectiveClassBuildItem.builder(httpResponseExceptionClasses.toArray(new String[0])) .methods() .build()); + + // implementations of serializers are used during errors reporting + LinkedHashSet<String> serializers = new LinkedHashSet<>( + combinedIndex.getIndex().getAllKnownImplementations(JsonSerializable.class).stream() + .map(ci -> ci.name().toString()) + .toList()); + serializers.addAll(combinedIndex.getIndex().getAllKnownImplementations(XmlSerializable.class).stream() + .map(ci -> ci.name().toString()) + .toList()); + + reflectiveClasses.produce(ReflectiveClassBuildItem.builder(serializers.toArray(new String[0])) + .methods() + .fields() + .build()); } @BuildStep diff --git a/extensions/azure-storage-datalake/deployment/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/deployment/AzureStorageDatalakeProcessor.java b/extensions/azure-storage-datalake/deployment/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/deployment/AzureStorageDatalakeProcessor.java index 1bc96b0117..212e481698 100644 --- a/extensions/azure-storage-datalake/deployment/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/deployment/AzureStorageDatalakeProcessor.java +++ b/extensions/azure-storage-datalake/deployment/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/deployment/AzureStorageDatalakeProcessor.java @@ -16,9 +16,7 @@ */ package org.apache.camel.quarkus.component.azure.storage.datalake.deployment; -import java.util.LinkedList; -import java.util.List; -import java.util.stream.Collectors; +import java.util.LinkedHashSet; import com.azure.core.annotation.ServiceInterface; import io.quarkus.deployment.annotations.BuildProducer; @@ -54,10 +52,10 @@ class AzureStorageDatalakeProcessor { ReflectiveClassBuildItem registerForReflection(CombinedIndexBuildItem combinedIndex) { IndexView index = combinedIndex.getIndex(); - List<String> dtos = new LinkedList<>(index.getKnownClasses().stream() + LinkedHashSet<String> dtos = new LinkedHashSet<>(index.getKnownClasses().stream() .map(ci -> ci.name().toString()) .filter(n -> n.startsWith("com.azure.storage.file.datalake.implementation.models")) - .collect(Collectors.toList())); + .toList()); dtos.add("com.azure.storage.file.datalake.implementation.ServicesImpl$ServicesService"); diff --git a/integration-test-groups/azure/azure-storage-datalake/pom.xml b/integration-test-groups/azure/azure-storage-datalake/pom.xml index 691116ea99..9dd6798ba3 100644 --- a/integration-test-groups/azure/azure-storage-datalake/pom.xml +++ b/integration-test-groups/azure/azure-storage-datalake/pom.xml @@ -35,6 +35,14 @@ <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-azure-storage-datalake</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-file</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-direct</artifactId> + </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> @@ -60,6 +68,11 @@ <artifactId>camel-quarkus-integration-tests-support-azure</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> </dependencies> <profiles> @@ -85,6 +98,32 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-direct-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-file-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> </profile> <profile> diff --git a/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeResource.java b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeResource.java index 699a1dd25a..a3edb33a83 100644 --- a/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeResource.java +++ b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeResource.java @@ -16,8 +16,11 @@ */ package org.apache.camel.quarkus.component.azure.storage.datalake.it; +import java.io.ByteArrayOutputStream; import java.net.URI; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -38,9 +41,12 @@ import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import org.apache.camel.CamelContext; import org.apache.camel.ConsumerTemplate; +import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; import org.apache.camel.component.azure.storage.datalake.DataLakeConstants; import org.apache.camel.component.azure.storage.datalake.DataLakeOperationsDefinition; @@ -56,6 +62,9 @@ public class AzureStorageDatalakeResource { @Inject ConsumerTemplate consumerTemplate; + @Inject + CamelContext camelContext; + @ConfigProperty(name = "azure.storage.account-name") Optional<String> azureStorageAccountName; @@ -169,6 +178,53 @@ public class AzureStorageDatalakeResource { 10000, String.class); } + @Path("/route/{route}/filesystem/{filesystem}") + @POST + @Produces(MediaType.APPLICATION_JSON) + public Object consumer(@PathParam("route") String routeName, + @PathParam("filesystem") String filesystem, + @QueryParam("useOutputStream") boolean useOutputStream, + Map<String, Object> headers) throws Exception { + + ByteArrayOutputStream inMemoryStream = new ByteArrayOutputStream(); + + Map<String, Object> _headers = new HashMap(); + if (headers != null) { + _headers.putAll(headers); + + } + _headers.put("filesystemName", filesystem); + _headers.put("accountName", azureStorageAccountName.get()); + + Exchange exchange = producerTemplate.request( + "direct:" + routeName, + e -> { + e.getIn().setHeaders(_headers); + if (useOutputStream && "datalakeGetFile".equals(routeName)) { + e.getIn().setBody(inMemoryStream); + } + }); + + Object o = exchange.getIn().getBody(); + switch (routeName) { + case "datalakeListFileSystem": + return ((List<FileSystemItem>) o).stream() + .map(FileSystemItem::getName) + .collect(Collectors.toList()); + case "datalakeListPaths": + return ((List<PathItem>) o).stream() + .map(PathItem::getName) + .collect(Collectors.toList()); + case "datalakeGetFile": + if (useOutputStream) { + return inMemoryStream.toString(); + } + break; + } + + return exchange.getIn().getBody(String.class); + } + private String componentUri(final String filesystem, final DataLakeOperationsDefinition operation) { return String.format("azure-storage-datalake://%s%s?serviceClient=#azureDatalakeServiceClient&operation=%s", azureStorageAccountName, @@ -176,4 +232,10 @@ public class AzureStorageDatalakeResource { operation.name()); } + @Path("/start/{routeId}") + @GET + public Response startRoute(@PathParam("routeId") String routeId) throws Exception { + camelContext.getRouteController().startRoute(routeId); + return Response.ok().build(); + } } diff --git a/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeRoutes.java b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeRoutes.java new file mode 100644 index 0000000000..d184524d2d --- /dev/null +++ b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeRoutes.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.azure.storage.datalake.it; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import com.azure.storage.file.datalake.models.ListFileSystemsOptions; +import com.azure.storage.file.datalake.options.FileQueryOptions; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.azure.storage.datalake.DataLakeConstants; +import org.eclipse.microprofile.config.ConfigProvider; + +@ApplicationScoped +public class AzureStorageDatalakeRoutes extends RouteBuilder { + + public static final String FILE_NAME = "operations.txt"; + public static final String FILE_NAME2 = "test/file.txt"; + public static final String CONSUMER_FILE_NAME = "file_for_download.txt"; + public static final String CONSUMER_FILE_NAME2 = "file_for_download2.txt"; + private static final String CLIENT_SUFFIX = "&serviceClient=#azureDatalakeServiceClient"; + + @Override + public void configure() throws Exception { + + String tmpFolder = ConfigProvider.getConfig().getValue("cqDatalakeTmpFolder", String.class); + String consumerFilesystem = ConfigProvider.getConfig().getValue("cqCDatalakeConsumerFilesystem", String.class); + + /* Consumer examples */ + + //Consume a file from the storage datalake into a file using the file component + from("azure-storage-datalake://" + AzureStorageDatalakeUtil.getRealAccountKeyFromEnv() + "/" + consumerFilesystem + + "?fileName=" + CONSUMER_FILE_NAME + + CLIENT_SUFFIX) + .routeId("consumeWithFileComponent") + .autoStartup(false) + .to("file:" + tmpFolder + "/consumer-files?fileName=" + CONSUMER_FILE_NAME); + + //write to a file without using the file component + from("azure-storage-datalake://" + AzureStorageDatalakeUtil.getRealAccountKeyFromEnv() + "/" + consumerFilesystem + + "?fileName=" + CONSUMER_FILE_NAME2 + "&fileDir=" + tmpFolder + "/consumer-files&delay=3000000" + + CLIENT_SUFFIX) + .routeId("consumeWithoutFileComponent") + .autoStartup(false) + .log("File downloaded"); + + //batch consumer + from("azure-storage-datalake://" + AzureStorageDatalakeUtil.getRealAccountKeyFromEnv() + "/" + consumerFilesystem + + "?fileDir=" + tmpFolder + "/consumer-files/batch&path=/&delay=3000000" + CLIENT_SUFFIX) + .routeId("consumeBatch") + .autoStartup(false) + .log("File downloaded"); + + /* Producer examples */ + + //listFileSystem + from("direct:datalakeListFileSystem") + .process(exchange -> { + exchange.getIn().setHeader(DataLakeConstants.LIST_FILESYSTEMS_OPTIONS, + new ListFileSystemsOptions().setMaxResultsPerPage(10)); + }) + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=listFileSystem" + + CLIENT_SUFFIX); + + //createFileSystem + from("direct:datalakeCreateFilesystem") + .toD("azure-storage-datalake://${header.accountName}?operation=createFileSystem" + CLIENT_SUFFIX); + + //listPaths + from("direct:datalakeListPaths") + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=listPaths" + + CLIENT_SUFFIX); + + //getFile + from("direct:datalakeGetFile") + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=getFile&fileName=${header.fileName}" + + CLIENT_SUFFIX); + + //deleteFile + from("direct:datalakeDeleteFile") + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=deleteFile&fileName=" + + FILE_NAME + CLIENT_SUFFIX); + + //downloadToFile + from("direct:datalakeDownloadToFile") + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=downloadToFile&fileName=" + + FILE_NAME + "&fileDir=${header.tmpFolder}" + CLIENT_SUFFIX); + + //downloadLink + from("direct:datalakeDownloadLink") + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=downloadLink&fileName=" + + FILE_NAME + CLIENT_SUFFIX); + + //appendToFile + from("direct:datalakeAppendToFile") + .process(exchange -> { + final String data = exchange.getIn().getHeader("append", String.class); + final InputStream inputStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + exchange.getIn().setBody(inputStream); + }) + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=appendToFile&fileName=" + + FILE_NAME + CLIENT_SUFFIX); + + //flushToFile + from("direct:datalakeFlushToFile") + .process(exchange -> { + exchange.getIn().setHeader(DataLakeConstants.POSITION, 8); + }) + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=flushToFile&fileName=" + + FILE_NAME + CLIENT_SUFFIX); + + //openQueryInputStream + from("direct:openQueryInputStream") + .process(exchange -> { + exchange.getIn().setHeader(DataLakeConstants.QUERY_OPTIONS, + new FileQueryOptions("SELECT * from BlobStorage")); + }) + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=openQueryInputStream&fileName=" + + FILE_NAME + CLIENT_SUFFIX); + + //upload + from("direct:datalakeUpload") + .process(exchange -> { + String fileContent = exchange.getIn().getHeader("fileContent", String.class); + final InputStream inputStream = new ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8)); + exchange.getIn().setBody(inputStream); + }) + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=upload&fileName=" + + FILE_NAME + CLIENT_SUFFIX); + + // uploadFromFile + from("direct:datalakeUploadFromFile") + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=uploadFromFile&fileName=" + + FILE_NAME2 + CLIENT_SUFFIX); + + // createFile + from("direct:datalakeCreateFile") + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=createFile&fileName=${header.fileName}" + + CLIENT_SUFFIX); + + //deleteDirectory + from("direct:datalakeDeleteDirectory") + .toD("azure-storage-datalake://${header.accountName}/${header.filesystemName}?operation=deleteDirectory" + + CLIENT_SUFFIX); + } +} diff --git a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeUtil.java b/integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeUtil.java similarity index 100% rename from integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeUtil.java rename to integration-test-groups/azure/azure-storage-datalake/src/main/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeUtil.java diff --git a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTest.java b/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTest.java index 9bf9b0c66f..978331a9ff 100644 --- a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTest.java +++ b/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTest.java @@ -16,16 +16,30 @@ */ package org.apache.camel.quarkus.component.azure.storage.datalake.it; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; import io.restassured.RestAssured; +import io.restassured.http.ContentType; +import org.apache.camel.component.azure.storage.datalake.DataLakeConstants; import org.apache.commons.lang3.RandomStringUtils; +import org.eclipse.microprofile.config.ConfigProvider; import org.hamcrest.Matchers; import org.jboss.logging.Logger; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.awaitility.Awaitility; //Disable tests dynamically in beforeEach method, to reflect preferred env name in case they are used (see README.adoc) @QuarkusTest @@ -43,12 +57,13 @@ class AzureStorageDatalakeTest { public void beforeEach() { Assumptions.assumeTrue(AzureStorageDatalakeUtil.isRalAccountProvided(), "Azure Data Lake credentials were not provided"); + } @Test public void crud() { - final String filesystem = "cqfs" + RandomStringUtils.randomNumeric(16); - final String filename = "file" + RandomStringUtils.randomNumeric(16); + final String filesystem = "cqfscrud" + RandomStringUtils.randomNumeric(16); + final String filename = "file.txt"; /* The filesystem does not exist initially */ RestAssured.get("/azure-storage-datalake/filesystem/" + filesystem) @@ -122,4 +137,296 @@ class AzureStorageDatalakeTest { } } + + @Test + public void consumerRoutes() throws IOException { + final String filename = AzureStorageDatalakeRoutes.CONSUMER_FILE_NAME; + final String filename2 = AzureStorageDatalakeRoutes.CONSUMER_FILE_NAME2; + String filesystem = ConfigProvider.getConfig().getValue("cqCDatalakeConsumerFilesystem", String.class); + final String content = "Hello from download test! " + RandomStringUtils.randomNumeric(16); + final String tmpFolder = ConfigProvider.getConfig().getValue("cqDatalakeTmpFolder", String.class); + + /* The filesystem does not exist initially */ + RestAssured.get("/azure-storage-datalake/filesystem/" + filesystem) + .then() + .statusCode(200) + .body("", Matchers.not(Matchers.hasItem(filesystem))); + + try { + /* Create the filesystem */ + RestAssured.given() + .post("/azure-storage-datalake/filesystem/" + filesystem) + .then() + .statusCode(201); + + /* Upload */ + RestAssured.given() + .body(content) + .post("/azure-storage-datalake/filesystem/" + filesystem + "/path/" + filename) + .then() + .statusCode(201); + + LOG.info("Consume a file from the storage datalake into a file using the file component"); + RestAssured.get("/azure-storage-datalake/start/consumeWithFileComponent") + .then() + .statusCode(200); + + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(10, TimeUnit.SECONDS).untilAsserted( + () -> { + Path downloadedFilePath = Path.of(tmpFolder, "consumer-files").resolve(filename); + Assertions.assertTrue(downloadedFilePath.toFile().exists()); + Assertions.assertEquals(content, Files.readString(downloadedFilePath)); + }); + + LOG.info("write to a file without using the file component"); + + /* Upload */ + RestAssured.given() + .body(content) + .post("/azure-storage-datalake/filesystem/" + filesystem + "/path/" + filename2) + .then() + .statusCode(201); + RestAssured.get("/azure-storage-datalake/start/consumeWithoutFileComponent") + .then() + .statusCode(200); + Awaitility.await().pollInterval(5, TimeUnit.SECONDS).atMost(10, TimeUnit.SECONDS).untilAsserted( + () -> { + Path downloadedFilePath = Path.of(tmpFolder, "consumer-files", filename2); + Assertions.assertTrue(downloadedFilePath.toFile().exists()); + Assertions.assertEquals(content, Files.readString(downloadedFilePath)); + }); + + LOG.info("batch consumer"); + Assertions.assertTrue(Path.of(tmpFolder, "consumer-files", "batch").toFile().mkdir(), + "Folder for batch consumer has to exist"); + + RestAssured.get("/azure-storage-datalake/start/consumeBatch") + .then() + .statusCode(200); + Awaitility.await().pollInterval(5, TimeUnit.SECONDS).atMost(10, TimeUnit.SECONDS).untilAsserted( + () -> { + Path downloadedFilePath = Path.of(tmpFolder, "consumer-files", "batch").resolve(filename); + Assertions.assertTrue(downloadedFilePath.toFile().exists()); + Assertions.assertEquals(content, Files.readString(downloadedFilePath)); + Path downloadedFilePath2 = Path.of(tmpFolder, "consumer-files", "batch", filename2); + Assertions.assertTrue(downloadedFilePath2.toFile().exists()); + Assertions.assertEquals(content, Files.readString(downloadedFilePath2)); + }); + + } finally { + /* Clean up */ + RestAssured.given() + .delete("/azure-storage-datalake/filesystem/" + filesystem) + .then() + .statusCode(204); + } + + } + + @Test + public void producerRoutes() throws IOException { + final String filesystem = "cqfsops" + RandomStringUtils.randomNumeric(16); + final String filename = AzureStorageDatalakeRoutes.FILE_NAME; + final String tmpFolder = ConfigProvider.getConfig().getValue("cqDatalakeTmpFolder", String.class); + + RestAssured.get("/azure-storage-datalake/filesystem/" + filesystem) + .then() + .statusCode(200) + .body("", Matchers.not(Matchers.hasItem(filesystem))); + + try { + LOG.info("step - createFileSystem"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Map.of(DataLakeConstants.FILESYSTEM_NAME, filesystem)) + .post("/azure-storage-datalake/route/datalakeCreateFilesystem/filesystem/" + filesystem) + .then() + .statusCode(200); + + LOG.info("step - listFileSystem"); + RestAssured.given() + .contentType(ContentType.JSON) + .post("/azure-storage-datalake/route/datalakeListFileSystem/filesystem/" + filesystem) + .then() + .statusCode(200) + .body("", Matchers.hasItem(filesystem)); + + LOG.info("step - upload"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Map.of("fileContent", "Hello World from Camel!")) + .post("/azure-storage-datalake/route/datalakeUpload/filesystem/" + filesystem) + .then() + .statusCode(200); + + LOG.info("step - listPaths"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Collections.emptyMap()) + .post("/azure-storage-datalake/route/datalakeListPaths/filesystem/" + filesystem) + .then() + .statusCode(200) + .body("", Matchers.hasItem(filename)); + + LOG.info("step - getFile - via OutputStream"); + RestAssured.given() + .contentType(ContentType.JSON) + .queryParam("useOutputStream", true) + .body(Map.of("fileName", AzureStorageDatalakeRoutes.FILE_NAME)) + .post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem) + .then() + .statusCode(200) + .body(Matchers.is("Hello World from Camel!")); + + LOG.info("step - getFile - via InputStream"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Map.of("fileName", AzureStorageDatalakeRoutes.FILE_NAME)) + .post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem) + .then() + .statusCode(200) + .body(Matchers.is("Hello World from Camel!")); + + LOG.info("step - downloadToFile"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Map.of("tmpFolder", tmpFolder)) + .post("/azure-storage-datalake/route/datalakeDownloadToFile/filesystem/" + filesystem) + .then() + .statusCode(200) + .body(Matchers.is("Hello World from Camel!")); + + Path path = Path.of(tmpFolder, filename); + Assertions.assertTrue(Files.exists(path)); + Assertions.assertEquals("Hello World from Camel!", Files.readString(path)); + + LOG.info("step - downloadLink"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Collections.emptyMap()) + .post("/azure-storage-datalake/route/datalakeDownloadLink/filesystem/" + filesystem) + .then() + .statusCode(200) + .body(Matchers.startsWith( + "https://" + ConfigProvider.getConfig().getValue("azure.storage.account-name", String.class))); + + LOG.info("step - appendToFile"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Map.of("append", "appended")) + .post("/azure-storage-datalake/route/datalakeAppendToFile/filesystem/" + filesystem) + .then() + .statusCode(200); + //append does not happen without flush + RestAssured.given() + .contentType(ContentType.JSON) + .body(Map.of("fileName", AzureStorageDatalakeRoutes.FILE_NAME)) + .post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem) + .then() + .statusCode(200) + .body(Matchers.is("Hello World from Camel!")); + + LOG.info("step - datalakeFlushToFile"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Collections.emptyMap()) + .post("/azure-storage-datalake/route/datalakeFlushToFile/filesystem/" + filesystem) + .then() + .statusCode(200); + RestAssured.given() + .contentType(ContentType.JSON) + .queryParam("useOutputStream", true) + .body(Map.of("fileName", AzureStorageDatalakeRoutes.FILE_NAME)) + .post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem) + .then() + .statusCode(200) + .body(Matchers.is("Hello World from Camel!appended")); + + LOG.info("step - openQueryInputStream"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Collections.emptyMap()) + .post("/azure-storage-datalake/route/openQueryInputStream/filesystem/" + filesystem) + .then() + .statusCode(200) + .body(Matchers.is("Hello World from Camel!appended\n")); + + LOG.info("step - deleteFile"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Collections.emptyMap()) + .post("/azure-storage-datalake/route/datalakeDeleteFile/filesystem/" + filesystem) + .then() + .statusCode(200) + .body(Matchers.is("true")); + + LOG.info("step - listPaths"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Collections.emptyMap()) + .post("/azure-storage-datalake/route/datalakeListPaths/filesystem/" + filesystem) + .then() + .statusCode(200) + .body("", Matchers.not(Matchers.hasItem(filename))); + + LOG.info("step - uploadFromFile"); + File f = File.createTempFile("uploadFromFile", ".txt", new File(tmpFolder)); + String content2 = UUID.randomUUID().toString(); + Files.writeString(f.toPath(), content2); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Map.of(DataLakeConstants.PATH, f.getAbsolutePath())) + .post("/azure-storage-datalake/route/datalakeUploadFromFile/filesystem/" + filesystem) + .then() + .statusCode(200); + RestAssured.given() + .contentType(ContentType.JSON) + .queryParam("useOutputStream", true) + .body(Map.of("fileName", AzureStorageDatalakeRoutes.FILE_NAME2)) + .post("/azure-storage-datalake/route/datalakeGetFile/filesystem/" + filesystem) + .then() + .statusCode(200) + .body(Matchers.is(content2)); + + LOG.info("step - createFile"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Map.of("fileName", "emptyFile.txt", DataLakeConstants.DIRECTORY_NAME, "emptyTest")) + .post("/azure-storage-datalake/route/datalakeCreateFile/filesystem/" + filesystem) + .then() + .statusCode(200); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Collections.emptyMap()) + .post("/azure-storage-datalake/route/datalakeListPaths/filesystem/" + filesystem) + .then() + .statusCode(200) + .body("", Matchers.hasItem("test")) + .body("", Matchers.hasItem("emptyTest")); + + LOG.info("step - deleteDirectory"); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Map.of(DataLakeConstants.DIRECTORY_NAME, "emptyTest", "CamelAzureStorageDataLakeRecursive", true)) + .post("/azure-storage-datalake/route/datalakeDeleteDirectory/filesystem/" + filesystem) + .then() + .statusCode(200); + RestAssured.given() + .contentType(ContentType.JSON) + .body(Collections.emptyMap()) + .post("/azure-storage-datalake/route/datalakeListPaths/filesystem/" + filesystem) + .then() + .statusCode(200) + .body("", Matchers.hasItem("test")) + .body("", Matchers.not(Matchers.hasItem("emptyTest"))); + + } finally { + /* Clean up */ + RestAssured.given() + .delete("/azure-storage-datalake/filesystem/" + filesystem) + .then() + .statusCode(204); + } + + } } diff --git a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTestResource.java b/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTestResource.java index 27a09589b0..0417f14e54 100644 --- a/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTestResource.java +++ b/integration-test-groups/azure/azure-storage-datalake/src/test/java/org/apache/camel/quarkus/component/azure/storage/datalake/it/AzureStorageDatalakeTestResource.java @@ -17,16 +17,24 @@ package org.apache.camel.quarkus.component.azure.storage.datalake.it; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; import java.util.Map; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; import org.apache.camel.quarkus.test.mock.backend.MockBackendUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AzureStorageDatalakeTestResource implements QuarkusTestResourceLifecycleManager { private static final Logger LOGGER = LoggerFactory.getLogger(AzureStorageDatalakeTestResource.class); + private Path tmpFolder; + @Override public Map<String, String> start() { String realAzureStorageAccountName = AzureStorageDatalakeUtil.getRealAccountNameFromEnv(); @@ -41,13 +49,34 @@ public class AzureStorageDatalakeTestResource implements QuarkusTestResourceLife MockBackendUtils.logRealBackendUsed(); } + //create tmp folder (for routes) + String tmpFolderPath = null; + try { + tmpFolder = Files.createTempDirectory("CqAzureDatalakeTestTmpFolder"); + tmpFolderPath = tmpFolder.toFile().getAbsolutePath(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return Map.of( "azure.datalake.service.url", - "https://" + realAzureStorageAccountName + ".dfs.core.windows.net"); + "https://" + realAzureStorageAccountName + ".dfs.core.windows.net", + "cqDatalakeTmpFolder", tmpFolderPath, + "cqCDatalakeConsumerFilesystem", "cqfsconsumer" + RandomStringUtils.randomNumeric(16)); } + @SuppressWarnings("ResultOfMethodCallIgnored") @Override public void stop() { - //nothing + if (tmpFolder != null && tmpFolder.toFile().exists()) { + try (var dirStream = Files.walk(tmpFolder)) { + dirStream + .map(Path::toFile) + .sorted(Comparator.reverseOrder()) + .forEach(File::delete); + } catch (IOException e) { + //nothing + } + } } } diff --git a/integration-tests/azure-grouped/pom.xml b/integration-tests/azure-grouped/pom.xml index ecb4596d0a..caf025aaab 100644 --- a/integration-tests/azure-grouped/pom.xml +++ b/integration-tests/azure-grouped/pom.xml @@ -76,6 +76,10 @@ <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-direct</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-file</artifactId> + </dependency> <dependency> <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-integration-test-support</artifactId> @@ -271,6 +275,19 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-file-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-mock-deployment</artifactId>