This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit ebf620a5ab2d222fad10916f5ca4d6be106dae47 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Feb 21 10:06:50 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../atomix/client/map/AtomixMapConsumer.java | 2 +- .../client/messaging/AtomixMessagingConsumer.java | 2 +- .../atomix/client/queue/AtomixQueueConsumer.java | 2 +- .../atomix/client/set/AtomixSetConsumer.java | 2 +- .../atomix/client/value/AtomixValueConsumer.java | 2 +- .../apache/camel/component/avro/AvroEndpoint.java | 11 ------- .../apache/camel/component/avro/AvroListener.java | 17 ++++++++++- .../azure/eventhubs/EventHubsConsumer.java | 34 ++++++++++++++++++++-- .../azure/eventhubs/EventHubsEndpoint.java | 32 -------------------- .../component/azure/storage/blob/BlobConsumer.java | 2 +- .../azure/storage/datalake/DataLakeConsumer.java | 2 +- .../azure/storage/queue/QueueConsumer.java | 13 ++++++++- .../azure/storage/queue/QueueEndpoint.java | 10 ------- .../component/azure/blob/BlobServiceConsumer.java | 4 +-- .../azure/queue/QueueServiceConsumer.java | 5 ++-- 15 files changed, 71 insertions(+), 69 deletions(-) diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java index d3ad3b7..854b411 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixMapConsumer.java @@ -83,7 +83,7 @@ public final class AtomixMapConsumer extends AbstractAtomixClientConsumer<Atomix // ******************************************** private void onEvent(DistributedMap.EntryEvent<Object, Object> event) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type()); exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_KEY, event.entry().getKey()); diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java index 4371216..fdcccae 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/messaging/AtomixMessagingConsumer.java @@ -101,7 +101,7 @@ public final class AtomixMessagingConsumer extends AbstractAtomixClientConsumer< // ******************************************** private void onMessage(Message<Object> message) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setHeader(AtomixClientConstants.MESSAGE_ID, message.id()); if (resultHeader == null) { diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java index d90b38d..139d8a4 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/queue/AtomixQueueConsumer.java @@ -74,7 +74,7 @@ public final class AtomixQueueConsumer extends AbstractAtomixClientConsumer<Atom // ******************************************** private void onEvent(DistributedQueue.ValueEvent<Object> event) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type()); if (resultHeader == null) { diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java index e35ad9d..10343b6 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/set/AtomixSetConsumer.java @@ -74,7 +74,7 @@ public final class AtomixSetConsumer extends AbstractAtomixClientConsumer<Atomix // ******************************************** private void onEvent(DistributedSet.ValueEvent<Object> event) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type()); if (resultHeader == null) { diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java index 8fea9e6..1ec5efd 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/value/AtomixValueConsumer.java @@ -73,7 +73,7 @@ public final class AtomixValueConsumer extends AbstractAtomixClientConsumer<Atom // ******************************************** private void onEvent(DistributedValue.ChangeEvent<Object> event) { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type()); exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_OLD_VALUE, event.oldValue()); diff --git a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java index d85e4dc..7ef0f54 100644 --- a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java +++ b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroEndpoint.java @@ -54,17 +54,6 @@ public abstract class AvroEndpoint extends DefaultEndpoint implements AsyncEndpo return false; } - public Exchange createExchange(Protocol.Message message, Object request) { - ExchangePattern pattern = ExchangePattern.InOut; - if (message.getResponse().getType().equals(Schema.Type.NULL)) { - pattern = ExchangePattern.InOnly; - } - Exchange exchange = createExchange(pattern); - exchange.getIn().setBody(request); - exchange.getIn().setHeader(AvroConstants.AVRO_MESSAGE_NAME, message.getName()); - return exchange; - } - @Override public Consumer createConsumer(Processor processor) throws Exception { AvroConsumer consumer = new AvroConsumer(this, processor); diff --git a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java index 72d330b..d09b92b 100644 --- a/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java +++ b/components/camel-avro-rpc/src/main/java/org/apache/camel/component/avro/AvroListener.java @@ -28,6 +28,7 @@ import org.apache.avro.ipc.netty.NettyServer; import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.avro.specific.SpecificData; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; import org.apache.camel.support.ExchangeHelper; import org.apache.commons.lang3.StringUtils; import org.eclipse.jetty.util.log.Log; @@ -181,7 +182,7 @@ public class AvroListener { */ private static Object processExchange(AvroConsumer consumer, Protocol.Message message, Object params) throws Exception { Object response; - Exchange exchange = consumer.getEndpoint().createExchange(message, params); + Exchange exchange = createExchange(consumer, message, params); try { consumer.getProcessor().process(exchange); @@ -206,4 +207,18 @@ public class AvroListener { } return response; } + + protected static Exchange createExchange(AvroConsumer consumer, Protocol.Message message, Object request) { + ExchangePattern pattern = ExchangePattern.InOut; + if (message.getResponse().getType().equals(Schema.Type.NULL)) { + pattern = ExchangePattern.InOnly; + } + Exchange exchange = consumer.createExchange(true); + exchange.setPattern(pattern); + exchange.getIn().setBody(request); + exchange.getIn().setHeader(AvroConstants.AVRO_MESSAGE_NAME, message.getName()); + return exchange; + } + + } diff --git a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java index 0d27c8e..56ccee6 100644 --- a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java +++ b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java @@ -21,6 +21,7 @@ import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; import org.apache.camel.Exchange; import org.apache.camel.ExtendedExchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory; import org.apache.camel.spi.Synchronization; @@ -71,8 +72,37 @@ public class EventHubsConsumer extends DefaultConsumer { return (EventHubsEndpoint) super.getEndpoint(); } + private Exchange createAzureEventHubExchange(final EventContext eventContext) { + final Exchange exchange = createExchange(true); + final Message message = exchange.getIn(); + + // set body as byte[] and let camel typeConverters do the job to convert + message.setBody(eventContext.getEventData().getBody()); + // set headers + message.setHeader(EventHubsConstants.PARTITION_ID, eventContext.getPartitionContext().getPartitionId()); + message.setHeader(EventHubsConstants.PARTITION_KEY, eventContext.getEventData().getPartitionKey()); + message.setHeader(EventHubsConstants.OFFSET, eventContext.getEventData().getOffset()); + message.setHeader(EventHubsConstants.ENQUEUED_TIME, eventContext.getEventData().getEnqueuedTime()); + message.setHeader(EventHubsConstants.SEQUENCE_NUMBER, eventContext.getEventData().getSequenceNumber()); + + return exchange; + } + + private Exchange createAzureEventHubExchange(final ErrorContext errorContext) { + final Exchange exchange = createExchange(true); + final Message message = exchange.getIn(); + + // set headers + message.setHeader(EventHubsConstants.PARTITION_ID, errorContext.getPartitionContext().getPartitionId()); + + // set exception + exchange.setException(errorContext.getThrowable()); + + return exchange; + } + private void onEventListener(final EventContext eventContext) { - final Exchange exchange = getEndpoint().createAzureEventHubExchange(eventContext); + final Exchange exchange = createAzureEventHubExchange(eventContext); // add exchange callback exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @@ -93,7 +123,7 @@ public class EventHubsConsumer extends DefaultConsumer { } private void onErrorListener(final ErrorContext errorContext) { - final Exchange exchange = getEndpoint().createAzureEventHubExchange(errorContext); + final Exchange exchange = createAzureEventHubExchange(errorContext); // log exception if an exception occurred and was not handled if (exchange.getException() != null) { diff --git a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java index 061a277..b9010e8 100644 --- a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java +++ b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java @@ -16,13 +16,9 @@ */ package org.apache.camel.component.azure.eventhubs; -import com.azure.messaging.eventhubs.models.ErrorContext; -import com.azure.messaging.eventhubs.models.EventContext; import org.apache.camel.Category; import org.apache.camel.Component; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.UriEndpoint; @@ -71,32 +67,4 @@ public class EventHubsEndpoint extends DefaultEndpoint { this.configuration = configuration; } - public Exchange createAzureEventHubExchange(final EventContext eventContext) { - final Exchange exchange = createExchange(); - final Message message = exchange.getIn(); - - // set body as byte[] and let camel typeConverters do the job to convert - message.setBody(eventContext.getEventData().getBody()); - // set headers - message.setHeader(EventHubsConstants.PARTITION_ID, eventContext.getPartitionContext().getPartitionId()); - message.setHeader(EventHubsConstants.PARTITION_KEY, eventContext.getEventData().getPartitionKey()); - message.setHeader(EventHubsConstants.OFFSET, eventContext.getEventData().getOffset()); - message.setHeader(EventHubsConstants.ENQUEUED_TIME, eventContext.getEventData().getEnqueuedTime()); - message.setHeader(EventHubsConstants.SEQUENCE_NUMBER, eventContext.getEventData().getSequenceNumber()); - - return exchange; - } - - public Exchange createAzureEventHubExchange(final ErrorContext errorContext) { - final Exchange exchange = createExchange(); - final Message message = exchange.getIn(); - - // set headers - message.setHeader(EventHubsConstants.PARTITION_ID, errorContext.getPartitionContext().getPartitionId()); - - // set exception - exchange.setException(errorContext.getThrowable()); - - return exchange; - } } diff --git a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java index e8bd3e2..00e9490 100644 --- a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java +++ b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java @@ -81,7 +81,7 @@ public class BlobConsumer extends ScheduledBatchPollingConsumer { final BlobClientWrapper clientWrapper = new BlobClientWrapper(blobContainerClient.getBlobClient(blobName)); final BlobOperations operations = new BlobOperations(getEndpoint().getConfiguration(), clientWrapper); - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(true); BlobOperationResponse response; if (!ObjectHelper.isEmpty(getEndpoint().getConfiguration().getFileDir())) { diff --git a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java index 3785ed5..915f7ee 100644 --- a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java +++ b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java @@ -102,7 +102,7 @@ class DataLakeConsumer extends ScheduledBatchPollingConsumer { final DataLakeFileClientWrapper clientWrapper = new DataLakeFileClientWrapper(dataLakeFileSystemClient.getFileClient(fileName)); final DataLakeFileOperations operations = new DataLakeFileOperations(getEndpoint().getConfiguration(), clientWrapper); - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(true); DataLakeOperationResponse response; diff --git a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java index 28d2428..38d057f 100644 --- a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java +++ b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java @@ -27,6 +27,7 @@ import com.azure.storage.queue.models.QueueMessageItem; import com.azure.storage.queue.models.QueueStorageException; import org.apache.camel.Exchange; import org.apache.camel.ExtendedExchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.azure.storage.queue.client.QueueClientWrapper; import org.apache.camel.component.azure.storage.queue.operations.QueueOperations; @@ -79,7 +80,7 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer { private Queue<Exchange> createExchanges(final List<QueueMessageItem> messageItems) { return messageItems .stream() - .map(queueMessageItem -> getEndpoint().createExchange(queueMessageItem)) + .map(this::createExchange) .collect(Collectors.toCollection(LinkedList::new)); } @@ -142,6 +143,16 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer { return total; } + private Exchange createExchange(final QueueMessageItem messageItem) { + final Exchange exchange = createExchange(true); + final Message message = exchange.getIn(); + + message.setBody(messageItem.getMessageText()); + message.setHeaders(QueueExchangeHeaders.createQueueExchangeHeadersFromQueueMessageItem(messageItem).toMap()); + + return exchange; + } + /** * Strategy to delete the message after being processed. * diff --git a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java index 525ae03..4b92bf1 100644 --- a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java +++ b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueEndpoint.java @@ -70,16 +70,6 @@ public class QueueEndpoint extends DefaultEndpoint { ? configuration.getServiceClient() : QueueClientFactory.createQueueServiceClient(configuration); } - public Exchange createExchange(final QueueMessageItem messageItem) { - final Exchange exchange = createExchange(); - final Message message = exchange.getIn(); - - message.setBody(messageItem.getMessageText()); - message.setHeaders(QueueExchangeHeaders.createQueueExchangeHeadersFromQueueMessageItem(messageItem).toMap()); - - return exchange; - } - /** * The component configurations */ diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java index f3fc7b7..74a3638 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConsumer.java @@ -35,10 +35,10 @@ public class BlobServiceConsumer extends ScheduledPollConsumer { @Override protected int poll() throws Exception { - Exchange exchange = super.getEndpoint().createExchange(); + Exchange exchange = createExchange(true); try { getBlob(exchange); - super.getAsyncProcessor().process(exchange); + getProcessor().process(exchange); return 1; } catch (StorageException ex) { if (404 == ex.getHttpStatusCode()) { diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java index 5d665b8..4154b93 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java @@ -33,10 +33,10 @@ public class QueueServiceConsumer extends ScheduledPollConsumer { @Override protected int poll() throws Exception { - Exchange exchange = super.getEndpoint().createExchange(); + Exchange exchange = createExchange(true); try { retrieveMessage(exchange); - super.getAsyncProcessor().process(exchange); + getProcessor().process(exchange); return 1; } catch (StorageException ex) { if (404 == ex.getHttpStatusCode()) { @@ -51,7 +51,6 @@ public class QueueServiceConsumer extends ScheduledPollConsumer { //TODO: Support the batch processing if needed, given that it is possible // to retrieve more than 1 message in one go, similarly to camel-aws/s3 consumer. QueueServiceUtil.retrieveMessage(exchange, getConfiguration()); - } protected QueueServiceConfiguration getConfiguration() {