This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch opt-consumer
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3c78080edc36aa5caed4a44cecb9d6d35b5e2ef3
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Mar 9 17:41:43 2021 +0100

    CAMEL-16319: camel-core - Optimize consumer default done callback to reduce 
object allocations.
---
 .../aws2/ddbstream/Ddb2StreamConsumer.java         | 10 +++-----
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 10 +++-----
 .../camel/component/aws2/s3/AWS2S3Consumer.java    | 10 +++-----
 .../camel/component/aws2/sqs/Sqs2Consumer.java     |  6 +++--
 .../azure/eventhubs/EventHubsConsumer.java         |  6 +++--
 .../component/azure/storage/blob/BlobConsumer.java |  6 +++--
 .../azure/storage/datalake/DataLakeConsumer.java   |  5 +++-
 .../azure/storage/queue/QueueConsumer.java         |  5 +++-
 .../docker/consumer/DockerEventsConsumer.java      | 12 +++-------
 .../docker/consumer/DockerStatsConsumer.java       | 12 +++-------
 .../stream/GoogleCalendarStreamConsumer.java       |  7 +++---
 .../mail/stream/GoogleMailStreamConsumer.java      |  4 +++-
 .../sheets/stream/GoogleSheetsStreamConsumer.java  |  5 +++-
 .../google/storage/GoogleCloudStorageConsumer.java | 11 ++++-----
 .../cache/IgniteCacheContinuousQueryConsumer.java  | 15 +++---------
 .../camel/component/minio/MinioConsumer.java       |  4 ++--
 .../component/paho/mqtt5/PahoMqtt5Consumer.java    |  7 +++---
 .../apache/camel/component/paho/PahoConsumer.java  |  9 +++-----
 .../component/salesforce/SalesforceConsumer.java   | 27 ++++------------------
 .../camel/component/slack/SlackConsumer.java       |  7 +++---
 .../component/vertx/kafka/VertxKafkaConsumer.java  |  3 ++-
 21 files changed, 71 insertions(+), 110 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
index 32bcb64..b952176 100644
--- 
a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
+++ 
b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
@@ -85,13 +85,9 @@ public class Ddb2StreamConsumer extends 
ScheduledBatchPollingConsumer {
         while (!exchanges.isEmpty()) {
             final Exchange exchange = ObjectHelper.cast(Exchange.class, 
exchanges.poll());
 
-            LOG.trace("Processing exchange [{}] started.", exchange);
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    LOG.trace("Processing exchange [{}] done.", exchange);
-                }
-            });
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
             processedExchanges++;
         }
         return processedExchanges;
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index fcda12b..58d083f 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -104,13 +104,9 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer {
         while (!exchanges.isEmpty()) {
             final Exchange exchange = ObjectHelper.cast(Exchange.class, 
exchanges.poll());
 
-            LOG.trace("Processing exchange [{}] started.", exchange);
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    LOG.trace("Processing exchange [{}] done.", exchange);
-                }
-            });
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
             processedExchanges++;
         }
         return processedExchanges;
diff --git 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
index a4e4fef..a1a4d135 100644
--- 
a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
+++ 
b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
@@ -283,13 +283,9 @@ public class AWS2S3Consumer extends 
ScheduledBatchPollingConsumer {
                 }
             });
 
-            LOG.trace("Processing exchange [{}]...", exchange);
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    LOG.trace("Processing exchange [{}] done.", exchange);
-                }
-            });
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
         }
 
         return total;
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 3cc5cb7..c1f6421 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExtendedExchange;
@@ -214,8 +215,9 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
                 }
             });
 
-            LOG.trace("Processing exchange [{}]...", exchange);
-            getAsyncProcessor().process(exchange, doneSync -> 
LOG.trace("Processing exchange [{}] done.", exchange));
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
         }
 
         return total;
diff --git 
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
 
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index 56ccee6..0bb6c84 100644
--- 
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++ 
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.azure.eventhubs;
 import com.azure.messaging.eventhubs.EventProcessorClient;
 import com.azure.messaging.eventhubs.models.ErrorContext;
 import com.azure.messaging.eventhubs.models.EventContext;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
@@ -118,8 +119,9 @@ public class EventHubsConsumer extends DefaultConsumer {
                 processRollback(exchange);
             }
         });
-        // send message to next processor in the route
-        getAsyncProcessor().process(exchange, doneSync -> 
LOG.trace("Processing exchange [{}] done.", exchange));
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
     private void onErrorListener(final ErrorContext errorContext) {
diff --git 
a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
 
b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
index 00e9490..115e797 100644
--- 
a/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
+++ 
b/components/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
@@ -24,6 +24,7 @@ import java.util.Queue;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.models.BlobItem;
 import com.azure.storage.blob.models.BlobStorageException;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
@@ -149,8 +150,9 @@ public class BlobConsumer extends 
ScheduledBatchPollingConsumer {
                 }
             });
 
-            LOG.trace("Processing exchange [{}]...", exchange);
-            getAsyncProcessor().process(exchange, doneSync -> 
LOG.trace("Processing exchange [{}] done.", exchange));
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
         }
         return total;
     }
diff --git 
a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
 
b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
index 915f7ee..0189931 100644
--- 
a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
+++ 
b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
@@ -24,6 +24,7 @@ import java.util.Queue;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.models.DataLakeStorageException;
 import com.azure.storage.file.datalake.models.PathItem;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
@@ -148,7 +149,9 @@ class DataLakeConsumer extends 
ScheduledBatchPollingConsumer {
                     processRollback(exchange);
                 }
             });
-            getAsyncProcessor().process(exchange, doneSync -> 
LOG.trace("Processing exchange [{}] done.", exchange));
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
         }
 
         return total;
diff --git 
a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
 
b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
index 38d057f..b5e8969 100644
--- 
a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
+++ 
b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
 import com.azure.storage.queue.QueueServiceClient;
 import com.azure.storage.queue.models.QueueMessageItem;
 import com.azure.storage.queue.models.QueueStorageException;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
@@ -138,7 +139,9 @@ public class QueueConsumer extends 
ScheduledBatchPollingConsumer {
             });
 
             LOG.trace("Processing exchange [{}]...", exchange);
-            getAsyncProcessor().process(exchange, doneSync -> 
LOG.trace("Processing exchange [{}] done.", exchange));
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
         }
         return total;
     }
diff --git 
a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java
 
b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java
index 4ab51f6..8a62908 100644
--- 
a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java
+++ 
b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java
@@ -92,15 +92,9 @@ public class DockerEventsConsumer extends DefaultConsumer {
             Message message = exchange.getIn();
             message.setBody(event);
 
-            LOG.trace("Processing exchange [{}]...", exchange);
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    if (exchange.getException() != null) {
-                        getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
-                    }
-                }
-            });
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
         }
     }
 }
diff --git 
a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java
 
b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java
index 4160f37..91d4b64 100644
--- 
a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java
+++ 
b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java
@@ -82,15 +82,9 @@ public class DockerStatsConsumer extends DefaultConsumer {
             Message message = exchange.getIn();
             message.setBody(statistics);
 
-            LOGGER.trace("Processing exchange [{}]...", exchange);
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    if (exchange.getException() != null) {
-                        getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
-                    }
-                }
-            });
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
         }
     }
 }
diff --git 
a/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java
 
b/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java
index d129b35..100be42 100644
--- 
a/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java
+++ 
b/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java
@@ -30,6 +30,7 @@ import com.google.api.services.calendar.model.Events;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.support.EmptyAsyncCallback;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -152,7 +153,7 @@ public class GoogleCalendarStreamConsumer extends 
ScheduledBatchPollingConsumer
     private DateTime retrieveLastUpdateDate(List<Date> dateList) {
         Date finalLastUpdate;
         if (!dateList.isEmpty()) {
-            dateList.sort((o1, o2) -> o1.compareTo(o2));
+            dateList.sort(Date::compareTo);
             Date lastUpdateDate = dateList.get(dateList.size() - 1);
             java.util.Calendar calendar = java.util.Calendar.getInstance();
             calendar.setTime(lastUpdateDate);
@@ -179,9 +180,7 @@ public class GoogleCalendarStreamConsumer extends 
ScheduledBatchPollingConsumer
             // update pending number of exchanges
             pendingExchanges = total - index - 1;
 
-            getAsyncProcessor().process(exchange, doneSync -> {
-                // noop
-            });
+            getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
         }
         return total;
     }
diff --git 
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
 
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
index da70219..e12c501 100644
--- 
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
+++ 
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
@@ -29,12 +29,14 @@ import com.google.api.services.gmail.model.Message;
 import com.google.api.services.gmail.model.MessagePart;
 import com.google.api.services.gmail.model.MessagePartHeader;
 import com.google.api.services.gmail.model.ModifyMessageRequest;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.EmptyAsyncCallback;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -127,7 +129,7 @@ public class GoogleMailStreamConsumer extends 
ScheduledBatchPollingConsumer {
                 }
             });
 
-            getAsyncProcessor().process(exchange, doneSync -> 
LOG.trace("Processing exchange done"));
+            getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
         }
 
         return total;
diff --git 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
index eb04b5d..41b708a 100644
--- 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
+++ 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
@@ -28,6 +28,7 @@ import com.google.api.services.sheets.v4.Sheets;
 import com.google.api.services.sheets.v4.model.BatchGetValuesResponse;
 import com.google.api.services.sheets.v4.model.Spreadsheet;
 import com.google.api.services.sheets.v4.model.ValueRange;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -145,7 +146,9 @@ public class GoogleSheetsStreamConsumer extends 
ScheduledBatchPollingConsumer {
             // update pending number of exchanges
             pendingExchanges = total - index - 1;
 
-            getAsyncProcessor().process(exchange, doneSync -> 
LOG.trace("Processing exchange done"));
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
         }
 
         return total;
diff --git 
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
 
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
index b8cede8..2a299b2 100644
--- 
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
+++ 
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
@@ -36,6 +36,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.EmptyAsyncCallback;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -177,13 +178,9 @@ public class GoogleCloudStorageConsumer extends 
ScheduledBatchPollingConsumer {
                 }
             });
 
-            LOG.trace("Processing exchange [{}]...", exchange);
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    LOG.trace("Processing exchange [{}] done.", exchange);
-                }
-            });
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
         }
 
         return total;
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
index 8f7e81f..4e3e9c5 100644
--- 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
@@ -28,6 +28,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.ignite.IgniteConstants;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.EmptyAsyncCallback;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -137,22 +138,12 @@ public class IgniteCacheContinuousQueryConsumer extends 
DefaultConsumer {
         exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_EVENT_TYPE, 
entry.getEventType());
         exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_OLD_VALUE, 
entry.getOldValue());
         exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_KEY, 
entry.getKey());
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            @Override
-            public void done(boolean doneSync) {
-                // do nothing
-            }
-        });
+        getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
     }
 
     private void fireGroupedExchange(Iterable<CacheEntryEvent<? extends 
Object, ? extends Object>> events) {
         Exchange exchange = createExchange(events);
-        getAsyncProcessor().process(exchange, new AsyncCallback() {
-            @Override
-            public void done(boolean doneSync) {
-                // do nothing
-            }
-        });
+        getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
     }
 
     private Exchange createExchange(Object payload) {
diff --git 
a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
 
b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
index e98b295..b3cc1c8 100644
--- 
a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
+++ 
b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
@@ -42,6 +42,7 @@ import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.EmptyAsyncCallback;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CastUtils;
@@ -288,8 +289,7 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
                 }
             });
 
-            LOG.trace("Processing exchange ...");
-            getAsyncProcessor().process(exchange, doneSync -> 
LOG.trace("Processing exchange done."));
+            getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
         }
 
         return total;
diff --git 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
index af8345a..224c070 100644
--- 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
+++ 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.paho.mqtt5;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -105,9 +106,9 @@ public class PahoMqtt5Consumer extends DefaultConsumer {
                 LOG.debug("Message arrived on topic: {} -> {}", topic, 
message);
                 Exchange exchange = createExchange(message, topic);
 
-                getAsyncProcessor().process(exchange, doneSync -> {
-                    // noop
-                });
+                // use default consumer callback
+                AsyncCallback cb = defaultConsumerCallback(exchange, true);
+                getAsyncProcessor().process(exchange, cb);
             }
 
             @Override
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index f908efd..197b066 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -94,12 +94,9 @@ public class PahoConsumer extends DefaultConsumer {
                 LOG.debug("Message arrived on topic: {} -> {}", topic, 
message);
                 Exchange exchange = createExchange(message, topic);
 
-                getAsyncProcessor().process(exchange, new AsyncCallback() {
-                    @Override
-                    public void done(boolean doneSync) {
-                        // noop
-                    }
-                });
+                // use default consumer callback
+                AsyncCallback cb = defaultConsumerCallback(exchange, true);
+                getAsyncProcessor().process(exchange, cb);
             }
 
             @Override
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
index 5f60fb1..3f2836d 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
@@ -142,7 +142,7 @@ public class SalesforceConsumer extends DefaultConsumer {
             LOG.debug("Received event {} on channel {}", channel.getId(), 
channel.getChannelId());
         }
 
-        final Exchange exchange = createExchange(false);
+        final Exchange exchange = createExchange(true);
         final org.apache.camel.Message in = exchange.getIn();
 
         switch (messageKind) {
@@ -159,28 +159,9 @@ public class SalesforceConsumer extends DefaultConsumer {
                 throw new IllegalStateException("Unknown message kind: " + 
messageKind);
         }
 
-        try {
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean doneSync) {
-                    // noop
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Done processing event: {} {}", 
channel.getId(),
-                                doneSync ? "synchronously" : "asynchronously");
-                    }
-                }
-            });
-        } catch (final Exception e) {
-            final String msg = String.format("Error processing %s: %s", 
exchange, e);
-            handleException(msg, new SalesforceException(msg, e));
-        } finally {
-            final Exception ex = exchange.getException();
-            if (ex != null) {
-                final String msg = String.format("Unhandled exception: %s", 
ex.getMessage());
-                handleException(msg, new SalesforceException(msg, ex));
-            }
-            releaseExchange(exchange, false);
-        }
+        // use default consumer callback
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
 
b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
index 4296483..2379803 100644
--- 
a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
+++ 
b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
@@ -28,6 +28,7 @@ import 
com.slack.api.methods.response.conversations.ConversationsHistoryResponse
 import com.slack.api.methods.response.conversations.ConversationsListResponse;
 import com.slack.api.model.Conversation;
 import com.slack.api.model.Message;
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
@@ -122,9 +123,9 @@ public class SlackConsumer extends 
ScheduledBatchPollingConsumer {
             // update pending number of exchanges
             pendingExchanges = total - index - 1;
 
-            getAsyncProcessor().process(exchange, doneSync -> {
-                // noop
-            });
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, true);
+            getAsyncProcessor().process(exchange, cb);
         }
 
         return total;
diff --git 
a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
 
b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
index 75f1110..3327b1a 100644
--- 
a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
+++ 
b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
@@ -30,6 +30,7 @@ import 
org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfigurat
 import 
org.apache.camel.component.vertx.kafka.operations.VertxKafkaConsumerOperations;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.EmptyAsyncCallback;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,7 +117,7 @@ public class VertxKafkaConsumer extends DefaultConsumer 
implements Suspendable {
         // add exchange callback
         exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
         // send message to next processor in the route
-        getAsyncProcessor().process(exchange, doneSync -> 
LOG.trace("Processing exchange [{}] done.", exchange));
+        getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
     }
 
     private void onErrorListener(final Throwable error) {

Reply via email to