This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 9780179  CAMEL-15517: Support iterable of events in Azure EventHubs 
component (#4179)
9780179 is described below

commit 97801790ece61614555b7b32d57a8c345134c530
Author: Omar Al-Safi <omars...@gmail.com>
AuthorDate: Wed Sep 9 13:50:54 2020 +0200

    CAMEL-15517: Support iterable of events in Azure EventHubs component (#4179)
---
 .../component/azure/eventhubs/azure-eventhubs.json |   2 +-
 .../src/main/docs/azure-eventhubs-component.adoc   |  16 +++-
 .../operations/EventHubsProducerOperations.java    |  48 +++++++++-
 .../operations/EventHubsProducerOperationsIT.java  | 105 ++++++++++++++++++++-
 4 files changed, 161 insertions(+), 10 deletions(-)

diff --git 
a/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
 
b/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
index b564024..7836480 100644
--- 
a/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
+++ 
b/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
@@ -40,7 +40,7 @@
     "partitionId": { "kind": "property", "displayName": "Partition Id", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "Sets the identifier of 
the Event Hub partition that the {link EventData events} will be sent to. If 
the identifier is not spe [...]
     "partitionKey": { "kind": "property", "displayName": "Partition Key", 
"group": "producer", "label": "producer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "Sets a hashing key to be 
provided for the batch of events, which instructs the Event Hubs service to map 
this key to a spec [...]
     "producerAsyncClient": { "kind": "property", "displayName": "Producer 
Async Client", "group": "producer", "label": "producer", "required": false, 
"type": "object", "javaType": 
"com.azure.messaging.eventhubs.EventHubProducerAsyncClient", "deprecated": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "Sets the 
EventHubProducerAsyncClient.An asynchronous producer respo [...]
-    "basicPropertyBinding": { "kind": "property", "displayName": "Basic 
Property Binding", "group": "advanced", "label": "advanced", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": true, "secret": false, 
"defaultValue": false, "description": "Whether the component should use basic 
property binding (Camel 2.x) or the newer property binding with additional 
capabilities" },
+    "basicPropertyBinding": { "kind": "property", "displayName": "Basic 
Property Binding", "group": "advanced", "label": "advanced", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, 
"defaultValue": false, "description": "Whether the component should use basic 
property binding (Camel 2.x) or the newer property binding with additional 
capabilities" },
     "connectionString": { "kind": "property", "displayName": "Connection 
String", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": true, 
"configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "Instead of supplying 
namespace, sharedAccessKey, sharedAccessName ... etc, you can just supply the 
connection string [...]
     "sharedAccessKey": { "kind": "property", "displayName": "Shared Access 
Key", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": true, 
"configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "The generated value for 
the SharedAccessName" },
     "sharedAccessName": { "kind": "property", "displayName": "Shared Access 
Name", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration", 
"configurationField": "configuration", "description": "The name you chose for 
your EventHubs SAS keys" }
diff --git 
a/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc 
b/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
index ce08321..ca0c6cb 100644
--- 
a/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
+++ 
b/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
@@ -133,7 +133,7 @@ The Azure Event Hubs component supports 21 options, which 
are listed below.
 | *partitionId* (producer) | Sets the identifier of the Event Hub partition 
that the {link EventData events} will be sent to. If the identifier is not 
specified, the Event Hubs service will be responsible for routing events that 
are sent to an available partition. |  | String
 | *partitionKey* (producer) | Sets a hashing key to be provided for the batch 
of events, which instructs the Event Hubs service to map this key to a specific 
partition. The selection of a partition is stable for a given partition hashing 
key. Should any other batches of events be sent using the same exact partition 
hashing key, the Event Hubs service will route them all to the same partition. 
This should be specified only when there is a need to group events by 
partition, but there is fl [...]
 | *producerAsyncClient* (producer) | Sets the EventHubProducerAsyncClient.An 
asynchronous producer responsible for transmitting EventData to a specific 
Event Hub, grouped together in batches. Depending on the {link 
CreateBatchOptions options} specified when creating an \{linkEventDataBatch\}, 
the events may be automatically routed to an available partition or specific to 
a partition. Use by this component to produce the data in camel producer. |  | 
EventHubProducerAsyncClient
-| *basicPropertyBinding* (advanced) | *Deprecated* Whether the component 
should use basic property binding (Camel 2.x) or the newer property binding 
with additional capabilities | false | boolean
+| *basicPropertyBinding* (advanced) | Whether the component should use basic 
property binding (Camel 2.x) or the newer property binding with additional 
capabilities | false | boolean
 | *connectionString* (security) | Instead of supplying namespace, 
sharedAccessKey, sharedAccessName ... etc, you can just supply the connection 
string for your eventHub. The connection string for EventHubs already include 
all the necessary information to connection to your EventHub. To learn on how 
to generate the connection string, take a look at this documentation: 
\https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string
 |  | String
 | *sharedAccessKey* (security) | The generated value for the SharedAccessName 
|  | String
 | *sharedAccessName* (security) | The name you chose for your EventHubs SAS 
keys |  | String
@@ -228,6 +228,20 @@ from("direct:start")
 .to("azure-eventhubs:?connectionString=RAW({{connectionString}})"
 ```
 
+Also, the component supports as well *aggregation* of messages by sending 
events as *iterable* of either Exchanges/Messages or normal data (e.g: list of 
Strings). For example:
+```
+from("direct:start")
+.process(exchange -> {
+        final List<String> messages = new LinkedList<>();
+        messages.add("Test String Message 1");
+        messages.add("Test String Message 2");
+
+        exchange.getIn().setHeader(EventHubsConstants.PARTITION_ID, 
firstPartition);
+        exchange.getIn().setBody(messages);
+})
+.to("azure-eventhubs:?connectionString=RAW({{connectionString}})"
+```
+
 === Development Notes (Important)
 When developing on this component, you will need to obtain your Azure 
accessKey in order to run the integration tests. In addition to the mocked unit 
tests
 you *will need to run the integration tests with every change you make or even 
client upgrade as the Azure client can break things even on minor versions 
upgrade.*
diff --git 
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
 
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
index 9183f03..4037ba2 100644
--- 
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
+++ 
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
@@ -17,12 +17,16 @@
 package org.apache.camel.component.azure.eventhubs.operations;
 
 import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 
 import com.azure.messaging.eventhubs.EventData;
 import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
 import com.azure.messaging.eventhubs.models.SendOptions;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.TypeConverter;
 import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
 import 
org.apache.camel.component.azure.eventhubs.EventHubsConfigurationOptionsProxy;
 import org.apache.camel.util.ObjectHelper;
@@ -97,16 +101,52 @@ public class EventHubsProducerOperations {
                 .setPartitionKey(partitionKey);
     }
 
+    @SuppressWarnings("unchecked")
     private Iterable<EventData> createEventData(final Exchange exchange) {
-        final byte[] data = exchange.getIn().getBody(byte[].class);
+        // check if our exchange is list or contain some values
+        if (exchange.getIn().getBody() instanceof Iterable) {
+            return createEventDataFromIterable((Iterable<Object>) 
exchange.getIn().getBody(),
+                    exchange.getContext().getTypeConverter());
+        }
+
+        // we have only a single event here
+        return 
Collections.singletonList(createEventDataFromExchange(exchange));
+    }
+
+    private Iterable<EventData> createEventDataFromIterable(final 
Iterable<Object> inputData, final TypeConverter converter) {
+        final List<EventData> finalEventData = new LinkedList<>();
+
+        inputData.forEach(data -> {
+            if (data instanceof Exchange) {
+                finalEventData.add(createEventDataFromExchange((Exchange) 
data));
+            } else if (data instanceof Message) {
+                finalEventData.add(createEventDataFromMessage((Message) data));
+            } else {
+                finalEventData.add(createEventDataFromObject(data, converter));
+            }
+        });
+
+        return finalEventData;
+    }
+
+    private EventData createEventDataFromExchange(final Exchange exchange) {
+        return createEventDataFromMessage(exchange.getIn());
+    }
+
+    private EventData createEventDataFromMessage(final Message message) {
+        return createEventDataFromObject(message.getBody(), 
message.getExchange().getContext().getTypeConverter());
+    }
+
+    private EventData createEventDataFromObject(final Object inputData, final 
TypeConverter converter) {
+        final byte[] data = converter.convertTo(byte[].class, inputData);
 
         if (ObjectHelper.isEmpty(data)) {
             throw new IllegalArgumentException(
                     String.format("Cannot convert message body %s to byte[]. 
You will need "
                                   + "to make sure the data encoded in byte[] 
or add a Camel TypeConverter to convert the data to byte[]",
-                            exchange.getIn().getBody()));
+                            inputData));
         }
-        // for now we only support single event
-        return Collections.singletonList(new EventData(data));
+
+        return new EventData(data);
     }
 }
diff --git 
a/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
 
b/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
index 03e09f0..e5d7960 100644
--- 
a/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
+++ 
b/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.azure.eventhubs.operations;
 
 import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -32,6 +34,7 @@ import 
org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
@@ -40,6 +43,8 @@ import org.junit.jupiter.api.TestInstance;
 class EventHubsProducerOperationsIT extends CamelTestSupport {
 
     private EventHubsConfiguration configuration;
+    private EventHubProducerAsyncClient producerAsyncClient;
+    private EventHubConsumerAsyncClient consumerAsyncClient;
 
     @BeforeAll
     public void prepare() throws Exception {
@@ -48,15 +53,14 @@ class EventHubsProducerOperationsIT extends 
CamelTestSupport {
         configuration = new EventHubsConfiguration();
         
configuration.setConnectionString(properties.getProperty("connectionString"));
         
configuration.setConsumerGroupName(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME);
+
+        producerAsyncClient = 
EventHubsClientFactory.createEventHubProducerAsyncClient(configuration);
+        consumerAsyncClient = 
EventHubsClientFactory.createEventHubConsumerAsyncClient(configuration);
     }
 
     @Test
     public void testSendEventWithSpecificPartition() {
-        final EventHubProducerAsyncClient producerAsyncClient
-                = 
EventHubsClientFactory.createEventHubProducerAsyncClient(configuration);
         final EventHubsProducerOperations operations = new 
EventHubsProducerOperations(producerAsyncClient, configuration);
-        final EventHubConsumerAsyncClient consumerAsyncClient
-                = 
EventHubsClientFactory.createEventHubConsumerAsyncClient(configuration);
         final String firstPartition = 
producerAsyncClient.getPartitionIds().blockLast();
         final Exchange exchange = new DefaultExchange(context);
 
@@ -84,7 +88,100 @@ class EventHubsProducerOperationsIT extends 
CamelTestSupport {
 
                     return eventExists;
                 });
+    }
+
+    @Test
+    public void testIterableExchangesSendEventsWithSpecificPartition() {
+        final EventHubsProducerOperations operations = new 
EventHubsProducerOperations(producerAsyncClient, configuration);
+        final String firstPartition = 
producerAsyncClient.getPartitionIds().blockLast();
+
+        final Exchange exchange1 = new DefaultExchange(context);
+        final Exchange exchange2 = new DefaultExchange(context);
+
+        exchange1.getIn().setBody("Exchange Message 1");
+        exchange2.getIn().setBody("Exchange Message 2");
+
+        final List<Exchange> exchanges = new LinkedList<>();
+        exchanges.add(exchange1);
+        exchanges.add(exchange2);
+
+        final Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody(exchanges);
+
+        operations.sendEvents(exchange, doneSync -> {
+        });
+
+        Awaitility.await()
+                .atMost(40, TimeUnit.SECONDS)
+                .pollDelay(Duration.ofSeconds(2))
+                .pollInterval(Duration.ofSeconds(2))
+                .until(() -> {
+                    final Boolean event1Exists = consumerAsyncClient
+                            .receiveFromPartition(firstPartition, 
EventPosition.earliest())
+                            .any(partitionEvent -> 
partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+                                    && 
partitionEvent.getData().getBodyAsString()
+                                            .contains("Exchange Message 1"))
+                            .block();
+
+                    final Boolean event2Exists = consumerAsyncClient
+                            .receiveFromPartition(firstPartition, 
EventPosition.earliest())
+                            .any(partitionEvent -> 
partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+                                    && 
partitionEvent.getData().getBodyAsString()
+                                            .contains("Exchange Message 2"))
+                            .block();
+
+                    if (event1Exists == null || event2Exists == null) {
+                        return false;
+                    }
+
+                    return event1Exists && event2Exists;
+                });
+    }
+
+    @Test
+    public void testIterableStringSendEventsWithSpecificPartition() {
+        final EventHubsProducerOperations operations = new 
EventHubsProducerOperations(producerAsyncClient, configuration);
+        final String firstPartition = 
producerAsyncClient.getPartitionIds().blockLast();
+
+        final List<String> messages = new LinkedList<>();
+        messages.add("Test String Message 1");
+        messages.add("Test String Message 2");
+
+        final Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody(messages);
+
+        operations.sendEvents(exchange, doneSync -> {
+        });
+
+        Awaitility.await()
+                .atMost(40, TimeUnit.SECONDS)
+                .pollDelay(Duration.ofSeconds(2))
+                .pollInterval(Duration.ofSeconds(2))
+                .until(() -> {
+                    final Boolean event1Exists = consumerAsyncClient
+                            .receiveFromPartition(firstPartition, 
EventPosition.earliest())
+                            .any(partitionEvent -> 
partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+                                    && 
partitionEvent.getData().getBodyAsString()
+                                            .contains("Test String Message 1"))
+                            .block();
+
+                    final Boolean event2Exists = consumerAsyncClient
+                            .receiveFromPartition(firstPartition, 
EventPosition.earliest())
+                            .any(partitionEvent -> 
partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+                                    && 
partitionEvent.getData().getBodyAsString()
+                                            .contains("Test String Message 2"))
+                            .block();
+
+                    if (event1Exists == null || event2Exists == null) {
+                        return false;
+                    }
+
+                    return event1Exists && event2Exists;
+                });
+    }
 
+    @AfterAll
+    public void tearDown() {
         producerAsyncClient.close();
         consumerAsyncClient.close();
     }

Reply via email to