This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ebf620a5ab2d222fad10916f5ca4d6be106dae47
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Feb 21 10:06:50 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../atomix/client/map/AtomixMapConsumer.java       |  2 +-
 .../client/messaging/AtomixMessagingConsumer.java  |  2 +-
 .../atomix/client/queue/AtomixQueueConsumer.java   |  2 +-
 .../atomix/client/set/AtomixSetConsumer.java       |  2 +-
 .../atomix/client/value/AtomixValueConsumer.java   |  2 +-
 .../apache/camel/component/avro/AvroEndpoint.java  | 11 -------
 .../apache/camel/component/avro/AvroListener.java  | 17 ++++++++++-
 .../azure/eventhubs/EventHubsConsumer.java         | 34 ++++++++++++++++++++--
 .../azure/eventhubs/EventHubsEndpoint.java         | 32 --------------------
 .../component/azure/storage/blob/BlobConsumer.java |  2 +-
 .../azure/storage/datalake/DataLakeConsumer.java   |  2 +-
 .../azure/storage/queue/QueueConsumer.java         | 13 ++++++++-
 .../azure/storage/queue/QueueEndpoint.java         | 10 -------
 .../component/azure/blob/BlobServiceConsumer.java  |  4 +--
 .../azure/queue/QueueServiceConsumer.java          |  5 ++--
 15 files changed, 71 insertions(+), 69 deletions(-)

diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java
index d3ad3b7..854b411 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java
@@ -83,7 +83,7 @@ public final class AtomixMapConsumer extends 
AbstractAtomixClientConsumer<Atomix
     // ********************************************
 
     private void onEvent(DistributedMap.EntryEvent<Object, Object> event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, 
event.type());
         exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_KEY, 
event.entry().getKey());
 
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java
index 4371216..fdcccae 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java
@@ -101,7 +101,7 @@ public final class AtomixMessagingConsumer extends 
AbstractAtomixClientConsumer<
     // ********************************************
 
     private void onMessage(Message<Object> message) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.MESSAGE_ID, 
message.id());
 
         if (resultHeader == null) {
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
index d90b38d..139d8a4 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java
@@ -74,7 +74,7 @@ public final class AtomixQueueConsumer extends 
AbstractAtomixClientConsumer<Atom
     // ********************************************
 
     private void onEvent(DistributedQueue.ValueEvent<Object> event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, 
event.type());
 
         if (resultHeader == null) {
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
index e35ad9d..10343b6 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java
@@ -74,7 +74,7 @@ public final class AtomixSetConsumer extends 
AbstractAtomixClientConsumer<Atomix
     // ********************************************
 
     private void onEvent(DistributedSet.ValueEvent<Object> event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, 
event.type());
 
         if (resultHeader == null) {
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
index 8fea9e6..1ec5efd 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java
@@ -73,7 +73,7 @@ public final class AtomixValueConsumer extends 
AbstractAtomixClientConsumer<Atom
     // ********************************************
 
     private void onEvent(DistributedValue.ChangeEvent<Object> event) {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, 
event.type());
         exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_OLD_VALUE, 
event.oldValue());
 
diff --git 
a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
 
b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
index d85e4dc..7ef0f54 100644
--- 
a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
+++ 
b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java
@@ -54,17 +54,6 @@ public abstract class AvroEndpoint extends DefaultEndpoint 
implements AsyncEndpo
         return false;
     }
 
-    public Exchange createExchange(Protocol.Message message, Object request) {
-        ExchangePattern pattern = ExchangePattern.InOut;
-        if (message.getResponse().getType().equals(Schema.Type.NULL)) {
-            pattern = ExchangePattern.InOnly;
-        }
-        Exchange exchange = createExchange(pattern);
-        exchange.getIn().setBody(request);
-        exchange.getIn().setHeader(AvroConstants.AVRO_MESSAGE_NAME, 
message.getName());
-        return exchange;
-    }
-
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         AvroConsumer consumer = new AvroConsumer(this, processor);
diff --git 
a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java
 
b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java
index 72d330b..d09b92b 100644
--- 
a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java
+++ 
b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java
@@ -28,6 +28,7 @@ import org.apache.avro.ipc.netty.NettyServer;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.avro.specific.SpecificData;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.commons.lang3.StringUtils;
 import org.eclipse.jetty.util.log.Log;
@@ -181,7 +182,7 @@ public class AvroListener {
      */
     private static Object processExchange(AvroConsumer consumer, 
Protocol.Message message, Object params) throws Exception {
         Object response;
-        Exchange exchange = consumer.getEndpoint().createExchange(message, 
params);
+        Exchange exchange = createExchange(consumer, message, params);
 
         try {
             consumer.getProcessor().process(exchange);
@@ -206,4 +207,18 @@ public class AvroListener {
         }
         return response;
     }
+
+    protected static Exchange createExchange(AvroConsumer consumer, 
Protocol.Message message, Object request) {
+        ExchangePattern pattern = ExchangePattern.InOut;
+        if (message.getResponse().getType().equals(Schema.Type.NULL)) {
+            pattern = ExchangePattern.InOnly;
+        }
+        Exchange exchange = consumer.createExchange(true);
+        exchange.setPattern(pattern);
+        exchange.getIn().setBody(request);
+        exchange.getIn().setHeader(AvroConstants.AVRO_MESSAGE_NAME, 
message.getName());
+        return exchange;
+    }
+
+
 }
diff --git 
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
 
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index 0d27c8e..56ccee6 100644
--- 
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++ 
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -21,6 +21,7 @@ import com.azure.messaging.eventhubs.models.ErrorContext;
 import com.azure.messaging.eventhubs.models.EventContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import 
org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
 import org.apache.camel.spi.Synchronization;
@@ -71,8 +72,37 @@ public class EventHubsConsumer extends DefaultConsumer {
         return (EventHubsEndpoint) super.getEndpoint();
     }
 
+    private Exchange createAzureEventHubExchange(final EventContext 
eventContext) {
+        final Exchange exchange = createExchange(true);
+        final Message message = exchange.getIn();
+
+        // set body as byte[] and let camel typeConverters do the job to 
convert
+        message.setBody(eventContext.getEventData().getBody());
+        // set headers
+        message.setHeader(EventHubsConstants.PARTITION_ID, 
eventContext.getPartitionContext().getPartitionId());
+        message.setHeader(EventHubsConstants.PARTITION_KEY, 
eventContext.getEventData().getPartitionKey());
+        message.setHeader(EventHubsConstants.OFFSET, 
eventContext.getEventData().getOffset());
+        message.setHeader(EventHubsConstants.ENQUEUED_TIME, 
eventContext.getEventData().getEnqueuedTime());
+        message.setHeader(EventHubsConstants.SEQUENCE_NUMBER, 
eventContext.getEventData().getSequenceNumber());
+
+        return exchange;
+    }
+
+    private Exchange createAzureEventHubExchange(final ErrorContext 
errorContext) {
+        final Exchange exchange = createExchange(true);
+        final Message message = exchange.getIn();
+
+        // set headers
+        message.setHeader(EventHubsConstants.PARTITION_ID, 
errorContext.getPartitionContext().getPartitionId());
+
+        // set exception
+        exchange.setException(errorContext.getThrowable());
+
+        return exchange;
+    }
+
     private void onEventListener(final EventContext eventContext) {
-        final Exchange exchange = 
getEndpoint().createAzureEventHubExchange(eventContext);
+        final Exchange exchange = createAzureEventHubExchange(eventContext);
 
         // add exchange callback
         exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
@@ -93,7 +123,7 @@ public class EventHubsConsumer extends DefaultConsumer {
     }
 
     private void onErrorListener(final ErrorContext errorContext) {
-        final Exchange exchange = 
getEndpoint().createAzureEventHubExchange(errorContext);
+        final Exchange exchange = createAzureEventHubExchange(errorContext);
 
         // log exception if an exception occurred and was not handled
         if (exchange.getException() != null) {
diff --git 
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java
 
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java
index 061a277..b9010e8 100644
--- 
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java
+++ 
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java
@@ -16,13 +16,9 @@
  */
 package org.apache.camel.component.azure.eventhubs;
 
-import com.azure.messaging.eventhubs.models.ErrorContext;
-import com.azure.messaging.eventhubs.models.EventContext;
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.UriEndpoint;
@@ -71,32 +67,4 @@ public class EventHubsEndpoint extends DefaultEndpoint {
         this.configuration = configuration;
     }
 
-    public Exchange createAzureEventHubExchange(final EventContext 
eventContext) {
-        final Exchange exchange = createExchange();
-        final Message message = exchange.getIn();
-
-        // set body as byte[] and let camel typeConverters do the job to 
convert
-        message.setBody(eventContext.getEventData().getBody());
-        // set headers
-        message.setHeader(EventHubsConstants.PARTITION_ID, 
eventContext.getPartitionContext().getPartitionId());
-        message.setHeader(EventHubsConstants.PARTITION_KEY, 
eventContext.getEventData().getPartitionKey());
-        message.setHeader(EventHubsConstants.OFFSET, 
eventContext.getEventData().getOffset());
-        message.setHeader(EventHubsConstants.ENQUEUED_TIME, 
eventContext.getEventData().getEnqueuedTime());
-        message.setHeader(EventHubsConstants.SEQUENCE_NUMBER, 
eventContext.getEventData().getSequenceNumber());
-
-        return exchange;
-    }
-
-    public Exchange createAzureEventHubExchange(final ErrorContext 
errorContext) {
-        final Exchange exchange = createExchange();
-        final Message message = exchange.getIn();
-
-        // set headers
-        message.setHeader(EventHubsConstants.PARTITION_ID, 
errorContext.getPartitionContext().getPartitionId());
-
-        // set exception
-        exchange.setException(errorContext.getThrowable());
-
-        return exchange;
-    }
 }
diff --git 
a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
 
b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
index e8bd3e2..00e9490 100644
--- 
a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
+++ 
b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
@@ -81,7 +81,7 @@ public class BlobConsumer extends 
ScheduledBatchPollingConsumer {
         final BlobClientWrapper clientWrapper
                 = new 
BlobClientWrapper(blobContainerClient.getBlobClient(blobName));
         final BlobOperations operations = new 
BlobOperations(getEndpoint().getConfiguration(), clientWrapper);
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         BlobOperationResponse response;
         if 
(!ObjectHelper.isEmpty(getEndpoint().getConfiguration().getFileDir())) {
diff --git 
a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
 
b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
index 3785ed5..915f7ee 100644
--- 
a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
+++ 
b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
@@ -102,7 +102,7 @@ class DataLakeConsumer extends 
ScheduledBatchPollingConsumer {
         final DataLakeFileClientWrapper clientWrapper
                 = new 
DataLakeFileClientWrapper(dataLakeFileSystemClient.getFileClient(fileName));
         final DataLakeFileOperations operations = new 
DataLakeFileOperations(getEndpoint().getConfiguration(), clientWrapper);
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         DataLakeOperationResponse response;
 
diff --git 
a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
 
b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
index 28d2428..38d057f 100644
--- 
a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
+++ 
b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
@@ -27,6 +27,7 @@ import com.azure.storage.queue.models.QueueMessageItem;
 import com.azure.storage.queue.models.QueueStorageException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import 
org.apache.camel.component.azure.storage.queue.client.QueueClientWrapper;
 import 
org.apache.camel.component.azure.storage.queue.operations.QueueOperations;
@@ -79,7 +80,7 @@ public class QueueConsumer extends 
ScheduledBatchPollingConsumer {
     private Queue<Exchange> createExchanges(final List<QueueMessageItem> 
messageItems) {
         return messageItems
                 .stream()
-                .map(queueMessageItem -> 
getEndpoint().createExchange(queueMessageItem))
+                .map(this::createExchange)
                 .collect(Collectors.toCollection(LinkedList::new));
     }
 
@@ -142,6 +143,16 @@ public class QueueConsumer extends 
ScheduledBatchPollingConsumer {
         return total;
     }
 
+    private Exchange createExchange(final QueueMessageItem messageItem) {
+        final Exchange exchange = createExchange(true);
+        final Message message = exchange.getIn();
+
+        message.setBody(messageItem.getMessageText());
+        
message.setHeaders(QueueExchangeHeaders.createQueueExchangeHeadersFromQueueMessageItem(messageItem).toMap());
+
+        return exchange;
+    }
+
     /**
      * Strategy to delete the message after being processed.
      *
diff --git 
a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java
 
b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java
index 525ae03..4b92bf1 100644
--- 
a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java
+++ 
b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java
@@ -70,16 +70,6 @@ public class QueueEndpoint extends DefaultEndpoint {
                 ? configuration.getServiceClient() : 
QueueClientFactory.createQueueServiceClient(configuration);
     }
 
-    public Exchange createExchange(final QueueMessageItem messageItem) {
-        final Exchange exchange = createExchange();
-        final Message message = exchange.getIn();
-
-        message.setBody(messageItem.getMessageText());
-        
message.setHeaders(QueueExchangeHeaders.createQueueExchangeHeadersFromQueueMessageItem(messageItem).toMap());
-
-        return exchange;
-    }
-
     /**
      * The component configurations
      */
diff --git 
a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java
 
b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java
index f3fc7b7..74a3638 100644
--- 
a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java
+++ 
b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java
@@ -35,10 +35,10 @@ public class BlobServiceConsumer extends 
ScheduledPollConsumer {
 
     @Override
     protected int poll() throws Exception {
-        Exchange exchange = super.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         try {
             getBlob(exchange);
-            super.getAsyncProcessor().process(exchange);
+            getProcessor().process(exchange);
             return 1;
         } catch (StorageException ex) {
             if (404 == ex.getHttpStatusCode()) {
diff --git 
a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
 
b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
index 5d665b8..4154b93 100644
--- 
a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
+++ 
b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
@@ -33,10 +33,10 @@ public class QueueServiceConsumer extends 
ScheduledPollConsumer {
 
     @Override
     protected int poll() throws Exception {
-        Exchange exchange = super.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         try {
             retrieveMessage(exchange);
-            super.getAsyncProcessor().process(exchange);
+            getProcessor().process(exchange);
             return 1;
         } catch (StorageException ex) {
             if (404 == ex.getHttpStatusCode()) {
@@ -51,7 +51,6 @@ public class QueueServiceConsumer extends 
ScheduledPollConsumer {
         //TODO: Support the batch processing if needed, given that it is 
possible
         // to retrieve more than 1 message in one go, similarly to 
camel-aws/s3 consumer. 
         QueueServiceUtil.retrieveMessage(exchange, getConfiguration());
-
     }
 
     protected QueueServiceConfiguration getConfiguration() {

Reply via email to