This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch opt-consumer in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3c78080edc36aa5caed4a44cecb9d6d35b5e2ef3 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Mar 9 17:41:43 2021 +0100 CAMEL-16319: camel-core - Optimize consumer default done callback to reduce object allocations. --- .../aws2/ddbstream/Ddb2StreamConsumer.java | 10 +++----- .../component/aws2/kinesis/Kinesis2Consumer.java | 10 +++----- .../camel/component/aws2/s3/AWS2S3Consumer.java | 10 +++----- .../camel/component/aws2/sqs/Sqs2Consumer.java | 6 +++-- .../azure/eventhubs/EventHubsConsumer.java | 6 +++-- .../component/azure/storage/blob/BlobConsumer.java | 6 +++-- .../azure/storage/datalake/DataLakeConsumer.java | 5 +++- .../azure/storage/queue/QueueConsumer.java | 5 +++- .../docker/consumer/DockerEventsConsumer.java | 12 +++------- .../docker/consumer/DockerStatsConsumer.java | 12 +++------- .../stream/GoogleCalendarStreamConsumer.java | 7 +++--- .../mail/stream/GoogleMailStreamConsumer.java | 4 +++- .../sheets/stream/GoogleSheetsStreamConsumer.java | 5 +++- .../google/storage/GoogleCloudStorageConsumer.java | 11 ++++----- .../cache/IgniteCacheContinuousQueryConsumer.java | 15 +++--------- .../camel/component/minio/MinioConsumer.java | 4 ++-- .../component/paho/mqtt5/PahoMqtt5Consumer.java | 7 +++--- .../apache/camel/component/paho/PahoConsumer.java | 9 +++----- .../component/salesforce/SalesforceConsumer.java | 27 ++++------------------ .../camel/component/slack/SlackConsumer.java | 7 +++--- .../component/vertx/kafka/VertxKafkaConsumer.java | 3 ++- 21 files changed, 71 insertions(+), 110 deletions(-) diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java index 32bcb64..b952176 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java @@ -85,13 +85,9 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer { while (!exchanges.isEmpty()) { final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); - LOG.trace("Processing exchange [{}] started.", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - LOG.trace("Processing exchange [{}] done.", exchange); - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); processedExchanges++; } return processedExchanges; diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index fcda12b..58d083f 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -104,13 +104,9 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { while (!exchanges.isEmpty()) { final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); - LOG.trace("Processing exchange [{}] started.", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - LOG.trace("Processing exchange [{}] done.", exchange); - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); processedExchanges++; } return processedExchanges; diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java index a4e4fef..a1a4d135 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java @@ -283,13 +283,9 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { } }); - LOG.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - LOG.trace("Processing exchange [{}] done.", exchange); - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } return total; diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index 3cc5cb7..c1f6421 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExtendedExchange; @@ -214,8 +215,9 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { } }); - LOG.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", exchange)); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } return total; diff --git a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java index 56ccee6..0bb6c84 100644 --- a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java +++ b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.azure.eventhubs; import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; @@ -118,8 +119,9 @@ public class EventHubsConsumer extends DefaultConsumer { processRollback(exchange); } }); - // send message to next processor in the route - getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", exchange)); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } private void onErrorListener(final ErrorContext errorContext) { diff --git a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java index 00e9490..115e797 100644 --- a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java +++ b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java @@ -24,6 +24,7 @@ import java.util.Queue; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobStorageException; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; @@ -149,8 +150,9 @@ public class BlobConsumer extends ScheduledBatchPollingConsumer { } }); - LOG.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", exchange)); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } return total; } diff --git a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java index 915f7ee..0189931 100644 --- a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java +++ b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java @@ -24,6 +24,7 @@ import java.util.Queue; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.models.DataLakeStorageException; import com.azure.storage.file.datalake.models.PathItem; +import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExtendedExchange; @@ -148,7 +149,9 @@ class DataLakeConsumer extends ScheduledBatchPollingConsumer { processRollback(exchange); } }); - getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", exchange)); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } return total; diff --git a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java index 38d057f..b5e8969 100644 --- a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java +++ b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import com.azure.storage.queue.QueueServiceClient; import com.azure.storage.queue.models.QueueMessageItem; import com.azure.storage.queue.models.QueueStorageException; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; @@ -138,7 +139,9 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer { }); LOG.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", exchange)); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } return total; } diff --git a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java index 4ab51f6..8a62908 100644 --- a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java +++ b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java @@ -92,15 +92,9 @@ public class DockerEventsConsumer extends DefaultConsumer { Message message = exchange.getIn(); message.setBody(event); - LOG.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } } } diff --git a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java index 4160f37..91d4b64 100644 --- a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java +++ b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java @@ -82,15 +82,9 @@ public class DockerStatsConsumer extends DefaultConsumer { Message message = exchange.getIn(); message.setBody(statistics); - LOGGER.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } } } diff --git a/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java b/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java index d129b35..100be42 100644 --- a/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java +++ b/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java @@ -30,6 +30,7 @@ import com.google.api.services.calendar.model.Events; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.support.EmptyAsyncCallback; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -152,7 +153,7 @@ public class GoogleCalendarStreamConsumer extends ScheduledBatchPollingConsumer private DateTime retrieveLastUpdateDate(List<Date> dateList) { Date finalLastUpdate; if (!dateList.isEmpty()) { - dateList.sort((o1, o2) -> o1.compareTo(o2)); + dateList.sort(Date::compareTo); Date lastUpdateDate = dateList.get(dateList.size() - 1); java.util.Calendar calendar = java.util.Calendar.getInstance(); calendar.setTime(lastUpdateDate); @@ -179,9 +180,7 @@ public class GoogleCalendarStreamConsumer extends ScheduledBatchPollingConsumer // update pending number of exchanges pendingExchanges = total - index - 1; - getAsyncProcessor().process(exchange, doneSync -> { - // noop - }); + getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); } return total; } diff --git a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java index da70219..e12c501 100644 --- a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java +++ b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java @@ -29,12 +29,14 @@ import com.google.api.services.gmail.model.Message; import com.google.api.services.gmail.model.MessagePart; import com.google.api.services.gmail.model.MessagePartHeader; import com.google.api.services.gmail.model.ModifyMessageRequest; +import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.EmptyAsyncCallback; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -127,7 +129,7 @@ public class GoogleMailStreamConsumer extends ScheduledBatchPollingConsumer { } }); - getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done")); + getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); } return total; diff --git a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java index eb04b5d..41b708a 100644 --- a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java +++ b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java @@ -28,6 +28,7 @@ import com.google.api.services.sheets.v4.Sheets; import com.google.api.services.sheets.v4.model.BatchGetValuesResponse; import com.google.api.services.sheets.v4.model.Spreadsheet; import com.google.api.services.sheets.v4.model.ValueRange; +import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -145,7 +146,9 @@ public class GoogleSheetsStreamConsumer extends ScheduledBatchPollingConsumer { // update pending number of exchanges pendingExchanges = total - index - 1; - getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done")); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } return total; diff --git a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java index b8cede8..2a299b2 100644 --- a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java +++ b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java @@ -36,6 +36,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.EmptyAsyncCallback; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -177,13 +178,9 @@ public class GoogleCloudStorageConsumer extends ScheduledBatchPollingConsumer { } }); - LOG.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - LOG.trace("Processing exchange [{}] done.", exchange); - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); } return total; diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java index 8f7e81f..4e3e9c5 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java @@ -28,6 +28,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.ignite.IgniteConstants; import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.EmptyAsyncCallback; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; @@ -137,22 +138,12 @@ public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer { exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_EVENT_TYPE, entry.getEventType()); exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_OLD_VALUE, entry.getOldValue()); exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_KEY, entry.getKey()); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - // do nothing - } - }); + getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); } private void fireGroupedExchange(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> events) { Exchange exchange = createExchange(events); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - // do nothing - } - }); + getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); } private Exchange createExchange(Object payload) { diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java index e98b295..b3cc1c8 100644 --- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java +++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java @@ -42,6 +42,7 @@ import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.EmptyAsyncCallback; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.CastUtils; @@ -288,8 +289,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { } }); - LOG.trace("Processing exchange ..."); - getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done.")); + getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); } return total; diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java index af8345a..224c070 100644 --- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java +++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.paho.mqtt5; +import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -105,9 +106,9 @@ public class PahoMqtt5Consumer extends DefaultConsumer { LOG.debug("Message arrived on topic: {} -> {}", topic, message); Exchange exchange = createExchange(message, topic); - getAsyncProcessor().process(exchange, doneSync -> { - // noop - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } @Override diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java index f908efd..197b066 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java @@ -94,12 +94,9 @@ public class PahoConsumer extends DefaultConsumer { LOG.debug("Message arrived on topic: {} -> {}", topic, message); Exchange exchange = createExchange(message, topic); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - // noop - } - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } @Override diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java index 5f60fb1..3f2836d 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java @@ -142,7 +142,7 @@ public class SalesforceConsumer extends DefaultConsumer { LOG.debug("Received event {} on channel {}", channel.getId(), channel.getChannelId()); } - final Exchange exchange = createExchange(false); + final Exchange exchange = createExchange(true); final org.apache.camel.Message in = exchange.getIn(); switch (messageKind) { @@ -159,28 +159,9 @@ public class SalesforceConsumer extends DefaultConsumer { throw new IllegalStateException("Unknown message kind: " + messageKind); } - try { - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - // noop - if (LOG.isTraceEnabled()) { - LOG.trace("Done processing event: {} {}", channel.getId(), - doneSync ? "synchronously" : "asynchronously"); - } - } - }); - } catch (final Exception e) { - final String msg = String.format("Error processing %s: %s", exchange, e); - handleException(msg, new SalesforceException(msg, e)); - } finally { - final Exception ex = exchange.getException(); - if (ex != null) { - final String msg = String.format("Unhandled exception: %s", ex.getMessage()); - handleException(msg, new SalesforceException(msg, ex)); - } - releaseExchange(exchange, false); - } + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } @SuppressWarnings("unchecked") diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java index 4296483..2379803 100644 --- a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java +++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java @@ -28,6 +28,7 @@ import com.slack.api.methods.response.conversations.ConversationsHistoryResponse import com.slack.api.methods.response.conversations.ConversationsListResponse; import com.slack.api.model.Conversation; import com.slack.api.model.Message; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -122,9 +123,9 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer { // update pending number of exchanges pendingExchanges = total - index - 1; - getAsyncProcessor().process(exchange, doneSync -> { - // noop - }); + // use default consumer callback + AsyncCallback cb = defaultConsumerCallback(exchange, true); + getAsyncProcessor().process(exchange, cb); } return total; diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java index 75f1110..3327b1a 100644 --- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java +++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java @@ -30,6 +30,7 @@ import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfigurat import org.apache.camel.component.vertx.kafka.operations.VertxKafkaConsumerOperations; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.EmptyAsyncCallback; import org.apache.camel.support.SynchronizationAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +117,7 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable { // add exchange callback exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); // send message to next processor in the route - getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", exchange)); + getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); } private void onErrorListener(final Throwable error) {