This is an automated email from the ASF dual-hosted git repository. ppalaga pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push: new fc9c06f Azure EventHubs test fixup fc9c06f is described below commit fc9c06f6d45cff6c857f1423d38a501567682f56 Author: Peter Palaga <ppal...@redhat.com> AuthorDate: Thu Feb 18 22:08:00 2021 +0100 Azure EventHubs test fixup --- integration-tests/azure-eventhubs/README.adoc | 27 ++++++- .../azure-eventhubs/azure-resources.sh | 82 ++++++++++++++++++++++ integration-tests/azure-eventhubs/pom.xml | 12 ++++ .../azure/eventhubs/it/AzureEventhubsResource.java | 46 ++++++++---- ...hubsResource.java => AzureEventhubsRoutes.java} | 50 +++---------- 5 files changed, 159 insertions(+), 58 deletions(-) diff --git a/integration-tests/azure-eventhubs/README.adoc b/integration-tests/azure-eventhubs/README.adoc index 452c326..55b8163 100644 --- a/integration-tests/azure-eventhubs/README.adoc +++ b/integration-tests/azure-eventhubs/README.adoc @@ -4,10 +4,23 @@ By default the integration tests run only against the real remote Azure API. Prerequisites: -* A https://docs.microsoft.com/en-us/azure/storage/common/storage-account-create?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&tabs=azure-portal[general-purpose v2 Azure storage account] and [create a container] +* A https://docs.microsoft.com/en-us/azure/storage/common/storage-account-create?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&tabs=azure-portal[general-purpose v2 Azure storage account] and +https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-portal[create a container] * An https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create[Azure Event Hub] -* An https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string[Event Hubs connection string] and set the following environment variables: -+ +* An https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string[Event Hubs connection string] + +To create all of the above, you can use `azure-resources.sh` script as follows: + +[source,shell] +---- +$ ./azure-resources.sh create +---- + +The script outputs a set of export commands that you may want to paste to your shell, +or to your `.bashrc`. + +Here are the environment variables you need to set: + [source,shell] ---- export AZURE_STORAGE_ACCOUNT_NAME=<your-azure-storage-account-name> @@ -16,3 +29,11 @@ export AZURE_STORAGE_ACCOUNT_KEY=<your-azure-storage-account-key> export AZURE_BLOB_CONTAINER_NAME=<your-container-name> export AZURE_EVENT_HUBS_CONNECTION_STRING="Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<your-key>;EntityPath=<your-hub-name>" ---- + +To clean up, run + +[source,shell] +---- +$ ./azure-resources.sh delete +---- + diff --git a/integration-tests/azure-eventhubs/azure-resources.sh b/integration-tests/azure-eventhubs/azure-resources.sh new file mode 100755 index 0000000..fd7cc46 --- /dev/null +++ b/integration-tests/azure-eventhubs/azure-resources.sh @@ -0,0 +1,82 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +suffix="$(az ad signed-in-user show --query displayName -o tsv | tr '[:upper:]' '[:lower:]' | tr -cd '[:alnum:]' | cut -c-12)" +suffix="${suffix}1" +export AZURE_STORAGE_ACCOUNT_NAME=cqacc${suffix} +export AZURE_BLOB_CONTAINER_NAME=cq-container-${suffix} + +export RESOURCE_GROUP=cq-res-group-${suffix} +export ZONE=westeurope +export EH_NAMESPACE=cq-eh-namenspace-${suffix} +export EH_NAME=cq-event-hub-${suffix} + +function createResources() { + set -e + set -x + az group create --name ${RESOURCE_GROUP} --location ${ZONE} + + az storage account create --name ${AZURE_STORAGE_ACCOUNT_NAME} --resource-group ${RESOURCE_GROUP} --location ${ZONE} --sku Standard_LRS --kind StorageV2 + + SUBSCRIPTION_ID="$(az account list --query '[0].id' -o tsv)" + USER_ID="$(az ad signed-in-user show --query objectId -o tsv)" + az role assignment create --role "Storage Blob Data Contributor" --assignee ${USER_ID} --scope "/subscriptions/${SUBSCRIPTION_ID}/resourceGroups/${RESOURCE_GROUP}/providers/Microsoft.Storage/storageAccounts/${AZURE_STORAGE_ACCOUNT_NAME}" + + sleep 30 + + az storage container create --account-name ${AZURE_STORAGE_ACCOUNT_NAME} --name ${AZURE_BLOB_CONTAINER_NAME} --auth-mode login + + az eventhubs namespace create --name ${EH_NAMESPACE} --resource-group ${RESOURCE_GROUP} --location ${ZONE} + az eventhubs eventhub create --name ${EH_NAME} --resource-group ${RESOURCE_GROUP} --namespace-name ${EH_NAMESPACE} --partition-count 1 + + AZURE_EVENT_HUBS_CONNECTION_STRING=$(az eventhubs namespace authorization-rule keys list --resource-group ${RESOURCE_GROUP} --namespace-name ${EH_NAMESPACE} --name RootManageSharedAccessKey --query primaryConnectionString -o tsv) + + AZURE_STORAGE_ACCOUNT_KEY=$(az storage account keys list --account-name ${AZURE_STORAGE_ACCOUNT_NAME} --query '[0].value' -o tsv) + + set +x + echo "Add the following to your environment:" + + echo 'export AZURE_STORAGE_ACCOUNT_NAME="'${AZURE_STORAGE_ACCOUNT_NAME}'"' + echo 'export AZURE_STORAGE_ACCOUNT_KEY="'${AZURE_STORAGE_ACCOUNT_KEY}'"' + echo 'export AZURE_BLOB_CONTAINER_NAME="'${AZURE_BLOB_CONTAINER_NAME}'"' + echo 'export AZURE_EVENT_HUBS_CONNECTION_STRING="'$AZURE_EVENT_HUBS_CONNECTION_STRING';EntityPath='${EH_NAME}'"' +} + + +function deleteResources() { + set -x + set +e + az eventhubs eventhub delete --name ${EH_NAME} --resource-group ${RESOURCE_GROUP} --namespace-name ${EH_NAMESPACE} + az eventhubs namespace delete --name ${EH_NAMESPACE} --resource-group ${RESOURCE_GROUP} + az storage container delete --account-name ${AZURE_STORAGE_ACCOUNT_NAME} --name ${AZURE_BLOB_CONTAINER_NAME} + az storage account delete --name ${AZURE_STORAGE_ACCOUNT_NAME} --resource-group ${RESOURCE_GROUP} --yes + az group delete --name ${RESOURCE_GROUP} --yes +} + +case "$1" in +create) echo "Creating Azure resources" + createResources + ;; +delete) echo "Deleting Azure resources" + deleteResources + ;; +*) echo "usage: $0 [create|delete]" + ;; +esac + diff --git a/integration-tests/azure-eventhubs/pom.xml b/integration-tests/azure-eventhubs/pom.xml index f85884b..d1f74c9 100644 --- a/integration-tests/azure-eventhubs/pom.xml +++ b/integration-tests/azure-eventhubs/pom.xml @@ -61,9 +61,21 @@ <artifactId>camel-quarkus-main</artifactId> </dependency> <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-mock</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-quartz</artifactId> + </dependency> + <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-resteasy</artifactId> </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-jackson</artifactId> + </dependency> <!-- test dependencies --> <dependency> diff --git a/integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java b/integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java index a19aeb8..b9da58b 100644 --- a/integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java +++ b/integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java @@ -17,6 +17,8 @@ package org.apache.camel.quarkus.component.azure.eventhubs.it; import java.net.URI; +import java.util.List; +import java.util.stream.Collectors; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; @@ -28,8 +30,12 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import io.quarkus.scheduler.Scheduled; +import org.apache.camel.CamelContext; import org.apache.camel.ConsumerTemplate; +import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.mock.MockEndpoint; import org.eclipse.microprofile.config.inject.ConfigProperty; @Path("/azure-eventhubs") @@ -42,6 +48,9 @@ public class AzureEventhubsResource { @Inject ConsumerTemplate consumerTemplate; + @Inject + CamelContext context; + @ConfigProperty(name = "azure.storage.account-name") String azureStorageAccountName; @@ -54,18 +63,30 @@ public class AzureEventhubsResource { @ConfigProperty(name = "azure.blob.container.name") String azureBlobContainerName; + private volatile String message; + private int counter = 0; + + /** + * For some reason if we send just a single message, it is not always received by the consumer. + * Sending multiple messages seems to be more reliable. + */ + @Scheduled(every = "1s") + void schedule() { + if (message != null) { + final String endpointUri = "azure-eventhubs:?connectionString=RAW(" + connectionString + ")"; + producerTemplate.sendBody(endpointUri, message + (counter++)); + } + } + @Path("/receive-events") @GET - @Produces(MediaType.TEXT_PLAIN) - public String receiveEvents() throws Exception { - - final String endpointUri = "azure-eventhubs:?connectionString=RAW(" + connectionString - + ")&blobAccountName=RAW(" + azureStorageAccountName - + ")&blobAccessKey=RAW(" + azureStorageAccountKey - + ")&blobContainerName=RAW(" + azureBlobContainerName + ")"; - return consumerTemplate.receiveBody(endpointUri, - 10000L, - String.class); + @Produces(MediaType.APPLICATION_JSON) + public List<String> receiveEvents() throws Exception { + final MockEndpoint mockEndpoint = context.getEndpoint("mock:azure-consumed", MockEndpoint.class); + return mockEndpoint.getReceivedExchanges().stream() + .map(Exchange::getMessage) + .map(m -> m.getBody(String.class)) + .collect(Collectors.toList()); } @Path("/send-events") @@ -73,10 +94,7 @@ public class AzureEventhubsResource { @Produces(MediaType.TEXT_PLAIN) @Consumes(MediaType.TEXT_PLAIN) public Response sendEvents(String body) throws Exception { - - final String endpointUri = "azure-eventhubs:?connectionString=RAW(" + connectionString + ")"; - - producerTemplate.sendBody(endpointUri, body); + this.message = body; // start sending the messages via schedule() return Response.created(new URI("https://camel.apache.org/")).build(); } diff --git a/integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java b/integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java similarity index 55% copy from integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java copy to integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java index a19aeb8..0a98d3c 100644 --- a/integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java +++ b/integration-tests/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java @@ -16,31 +16,14 @@ */ package org.apache.camel.quarkus.component.azure.eventhubs.it; -import java.net.URI; - import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import org.apache.camel.ConsumerTemplate; -import org.apache.camel.ProducerTemplate; +import com.azure.core.amqp.AmqpTransportType; +import org.apache.camel.builder.RouteBuilder; import org.eclipse.microprofile.config.inject.ConfigProperty; -@Path("/azure-eventhubs") @ApplicationScoped -public class AzureEventhubsResource { - - @Inject - ProducerTemplate producerTemplate; - - @Inject - ConsumerTemplate consumerTemplate; +public class AzureEventhubsRoutes extends RouteBuilder { @ConfigProperty(name = "azure.storage.account-name") String azureStorageAccountName; @@ -54,30 +37,15 @@ public class AzureEventhubsResource { @ConfigProperty(name = "azure.blob.container.name") String azureBlobContainerName; - @Path("/receive-events") - @GET - @Produces(MediaType.TEXT_PLAIN) - public String receiveEvents() throws Exception { - - final String endpointUri = "azure-eventhubs:?connectionString=RAW(" + connectionString + @Override + public void configure() throws Exception { + from("azure-eventhubs:?connectionString=RAW(" + connectionString + ")&blobAccountName=RAW(" + azureStorageAccountName + ")&blobAccessKey=RAW(" + azureStorageAccountKey - + ")&blobContainerName=RAW(" + azureBlobContainerName + ")"; - return consumerTemplate.receiveBody(endpointUri, - 10000L, - String.class); - } - - @Path("/send-events") - @POST - @Produces(MediaType.TEXT_PLAIN) - @Consumes(MediaType.TEXT_PLAIN) - public Response sendEvents(String body) throws Exception { - - final String endpointUri = "azure-eventhubs:?connectionString=RAW(" + connectionString + ")"; + + ")&blobContainerName=RAW(" + azureBlobContainerName + ")&amqpTransportType=" + + AmqpTransportType.AMQP) + .to("mock:azure-consumed"); - producerTemplate.sendBody(endpointUri, body); - return Response.created(new URI("https://camel.apache.org/")).build(); } }