This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8a18b8c67533bd16340f0848b15df653ee2dc5a6
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Thu Jan 16 16:21:39 2020 +0100

    CAMEL-14409: camel-core - ExtendedExchange for advanced API
---
 .../src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java  | 3 ++-
 .../src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java  | 3 ++-
 .../main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java    | 5 +++--
 .../java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java | 3 ++-
 .../camel/processor/aggregate/cassandra/CassandraCamelCodec.java     | 3 ++-
 .../main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java   | 3 ++-
 .../camel/component/cxf/CxfConsumerClientDisconnectedTest.java       | 3 ++-
 .../org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java   | 3 ++-
 .../component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java     | 3 ++-
 .../org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java   | 3 ++-
 .../java/org/apache/camel/component/directvm/DirectVmProcessor.java  | 3 ++-
 .../camel/component/disruptor/AbstractSynchronizedExchange.java      | 5 +++--
 .../java/org/apache/camel/component/disruptor/DisruptorConsumer.java | 5 +++--
 .../java/org/apache/camel/component/disruptor/DisruptorProducer.java | 5 +++--
 .../disruptor/DisruptorInOutChainedWithOnCompletionTest.java         | 3 ++-
 .../disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java      | 3 ++-
 .../disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java         | 3 ++-
 .../main/java/org/apache/camel/component/elsql/ElsqlProducer.java    | 3 ++-
 .../java/org/apache/camel/component/file/GenericFileConsumer.java    | 3 ++-
 .../org/apache/camel/component/file/remote/RemoteFileConsumer.java   | 3 ++-
 .../camel/component/google/mail/stream/GoogleMailStreamConsumer.java | 3 ++-
 .../apache/camel/component/google/pubsub/GooglePubsubConsumer.java   | 3 ++-
 .../src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java  | 3 ++-
 .../src/main/java/org/apache/camel/component/http/HttpProducer.java  | 3 ++-
 .../org/apache/camel/component/ignite/cache/IgniteCacheProducer.java | 3 ++-
 .../main/java/org/apache/camel/component/ironmq/IronMQConsumer.java  | 3 ++-
 .../src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java  | 3 ++-
 .../src/main/java/org/apache/camel/component/jpa/JpaHelper.java      | 5 +++--
 .../java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java   | 3 ++-
 .../src/main/java/org/apache/camel/component/mail/MailConsumer.java  | 3 ++-
 .../apache/camel/component/netty/http/DefaultNettyHttpBinding.java   | 5 +++--
 .../org/apache/camel/component/netty/http/NettyHttpProducer.java     | 3 ++-
 .../main/java/org/apache/camel/component/netty/NettyProducer.java    | 3 ++-
 .../src/main/java/org/apache/camel/component/nsq/NsqConsumer.java    | 3 ++-
 .../component/pg/replication/slot/PgReplicationSlotConsumer.java     | 3 ++-
 .../reactive/streams/engine/DefaultCamelReactiveStreamsService.java  | 3 ++-
 .../apache/camel/component/reactor/engine/ReactorStreamsService.java | 3 ++-
 .../apache/camel/component/rxjava/engine/RxJavaStreamsService.java   | 3 ++-
 .../src/main/java/org/apache/camel/component/seda/SedaConsumer.java  | 5 +++--
 .../src/main/java/org/apache/camel/component/seda/SedaProducer.java  | 5 +++--
 .../org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java     | 3 ++-
 .../src/main/java/org/apache/camel/component/sql/SqlProducer.java    | 3 ++-
 .../org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java    | 3 ++-
 .../camel/processor/aggregate/tarfile/TarAggregationStrategy.java    | 3 ++-
 .../src/main/java/org/apache/camel/component/vm/VmConsumer.java      | 3 ++-
 .../src/main/java/org/apache/camel/component/xslt/XsltBuilder.java   | 3 ++-
 .../camel/processor/aggregate/zipfile/ZipAggregationStrategy.java    | 3 ++-
 47 files changed, 102 insertions(+), 55 deletions(-)

diff --git 
a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
 
b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index 55a21c7..b1d1b49 100644
--- 
a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ 
b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -31,6 +31,7 @@ import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
@@ -152,7 +153,7 @@ public class S3Consumer extends 
ScheduledBatchPollingConsumer {
             pendingExchanges = total - index - 1;
 
             // add on completion to handle after work when the exchange is done
-            exchange.addOnCompletion(new Synchronization() {
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
                 public void onComplete(Exchange exchange) {
                     processCommit(exchange);
                 }
diff --git 
a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
 
b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
index c9d1c63..6f6b2cc 100644
--- 
a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
+++ 
b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
@@ -26,6 +26,7 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -189,7 +190,7 @@ public class S3Endpoint extends ScheduledPollEndpoint {
             IOHelper.close(s3Object);
         } else {
             if (configuration.isAutocloseBody()) {
-                exchange.addOnCompletion(new SynchronizationAdapter() {
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                     @Override
                     public void onDone(Exchange exchange) {
                         IOHelper.close(s3Object);
diff --git 
a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
 
b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index 2d57f97..3df7fa9 100644
--- 
a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ 
b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -37,6 +37,7 @@ import 
com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
@@ -165,7 +166,7 @@ public class SqsConsumer extends 
ScheduledBatchPollingConsumer {
                 }
                 final ScheduledFuture<?> scheduledFuture = 
this.scheduledExecutor.scheduleAtFixedRate(new TimeoutExtender(exchange, 
repeatSeconds), delay, period,
                                                                                
                       TimeUnit.SECONDS);
-                exchange.addOnCompletion(new Synchronization() {
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
                     @Override
                     public void onComplete(Exchange exchange) {
                         cancelExtender(exchange);
@@ -185,7 +186,7 @@ public class SqsConsumer extends 
ScheduledBatchPollingConsumer {
             }
 
             // add on completion to handle after work when the exchange is done
-            exchange.addOnCompletion(new Synchronization() {
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
                 public void onComplete(Exchange exchange) {
                     processCommit(exchange);
                 }
diff --git 
a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
 
b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
index c9ecb30..2a9ee42 100644
--- 
a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
+++ 
b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java
@@ -25,6 +25,7 @@ import com.surftools.BeanstalkClient.Client;
 import com.surftools.BeanstalkClient.Job;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.beanstalk.processors.BuryCommand;
@@ -114,7 +115,7 @@ public class BeanstalkConsumer extends 
ScheduledPollConsumer {
                 if (!awaitJob) {
                     client.delete(job.getJobId());
                 } else {
-                    exchange.addOnCompletion(sync);
+                    
exchange.adapt(ExtendedExchange.class).addOnCompletion(sync);
                 }
 
                 return exchange;
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
index 6428224..7797766 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.DefaultExchangeHolder;
 
@@ -64,7 +65,7 @@ public class CassandraCamelCodec {
         if (fromEndpointUri != null) {
             Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri);
             if (fromEndpoint != null) {
-                answer.setFromEndpoint(fromEndpoint);
+                
answer.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
             }
         }
         return answer;
diff --git 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java
 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java
index 19d197f..c6cfb95 100644
--- 
a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java
+++ 
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.commons.csv.CSVFormat;
@@ -129,7 +130,7 @@ abstract class CsvUnmarshaller {
                 CSVParser parser = new CSVParser(reader, format);
                 CsvIterator answer = new CsvIterator(parser, converter);
                 // add to UoW so we can close the iterator so it can release 
any resources
-                exchange.addOnCompletion(new CsvUnmarshalOnCompletion(answer));
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
CsvUnmarshalOnCompletion(answer));
                 return answer;
             } catch (Exception e) {
                 IOHelper.close(reader);
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java
index 5364a97..f532e14 100644
--- 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java
@@ -20,6 +20,7 @@ import java.io.BufferedWriter;
 import java.io.OutputStreamWriter;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spi.Synchronization;
@@ -54,7 +55,7 @@ public class CxfConsumerClientDisconnectedTest extends 
CamelTestSupport {
                     .process(exchange-> {
                         Thread.sleep(100);
 
-                        exchange.addOnCompletion(new Synchronization() {
+                        
exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
                             @Override
                             public void onComplete(Exchange exchange) {
                                 template.sendBody("mock:onComplete", "");
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java
index 38fc677..b48f1d2 100644
--- 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.cxf;
 
+import org.apache.camel.ExtendedExchange;
 import org.w3c.dom.Node;
 
 import org.apache.camel.Exchange;
@@ -66,7 +67,7 @@ public class CxfConsumerStreamCacheTest extends 
CamelTestSupport {
                         cos.close();
                         exchange.getOut().setBody(cos.newStreamCache());
 
-                        exchange.addOnCompletion(new Synchronization() {
+                        
exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
                             @Override
                             public void onComplete(Exchange exchange) {
                                 template.sendBody("mock:onComplete", "");
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java
index c9d16a1..13248ad 100644
--- 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java
@@ -20,6 +20,7 @@ import java.io.BufferedWriter;
 import java.io.OutputStreamWriter;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.cxf.CXFTestSupport;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -55,7 +56,7 @@ public class CxfRsConsumerClientDisconnectedTest extends 
CamelTestSupport {
                     .process(exchange-> {
                         Thread.sleep(100);
 
-                        exchange.addOnCompletion(new Synchronization() {
+                        
exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
                             @Override
                             public void onComplete(Exchange exchange) {
                                 template.sendBody("mock:onComplete", "");
diff --git 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java
 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java
index 7b78d84..471ea2a 100644
--- 
a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java
+++ 
b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.cxf.jaxrs;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.cxf.CXFTestSupport;
 import org.apache.camel.component.cxf.jaxrs.testbean.Customer;
@@ -62,7 +63,7 @@ public class CxfRsStreamCacheTest extends CamelTestSupport {
                         cos.close();
                         exchange.getOut().setBody(cos.newStreamCache());
 
-                        exchange.addOnCompletion(new Synchronization() {
+                        
exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
                             @Override
                             public void onComplete(Exchange exchange) {
                                 template.sendBody("mock:onComplete", "");
diff --git 
a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
 
b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
index 3a434f3..266d111 100644
--- 
a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
+++ 
b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.directvm;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -87,7 +88,7 @@ public final class DirectVmProcessor extends 
DelegateAsyncProcessor {
         // send a new copied exchange with new camel context (do not handover 
completions)
         Exchange newExchange = 
ExchangeHelper.copyExchangeAndSetCamelContext(exchange, 
endpoint.getCamelContext(), false);
         // set the from endpoint
-        newExchange.setFromEndpoint(endpoint);
+        newExchange.adapt(ExtendedExchange.class).setFromEndpoint(endpoint);
         // The StreamCache created by the child routes must not be 
         // closed by the unit of work of the child route, but by the unit of 
         // work of the parent route or grand parent route or grand grand 
parent route ...(in case of nesting).
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java
index 8069970..3709e41 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.disruptor;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.UnitOfWorkHelper;
 import org.slf4j.Logger;
@@ -31,7 +32,7 @@ public abstract class AbstractSynchronizedExchange implements 
SynchronizedExchan
 
     public AbstractSynchronizedExchange(Exchange exchange) {
         this.exchange = exchange;
-        synchronizations = exchange.handoverCompletions();
+        synchronizations = 
exchange.adapt(ExtendedExchange.class).handoverCompletions();
     }
 
     @Override
@@ -43,7 +44,7 @@ public abstract class AbstractSynchronizedExchange implements 
SynchronizedExchan
     public Exchange cancelAndGetOriginalExchange() {
         if (synchronizations != null) {
             for (Synchronization synchronization : synchronizations) {
-                exchange.addOnCompletion(synchronization);
+                
exchange.adapt(ExtendedExchange.class).addOnCompletion(synchronization);
             }
         }
 
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index 31e90ff..77b74e4 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -23,6 +23,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.Suspendable;
@@ -138,7 +139,7 @@ public class DisruptorConsumer extends ServiceSupport 
implements Consumer, Suspe
         final Exchange newExchange = ExchangeHelper
                 .copyExchangeAndSetCamelContext(exchange, 
endpoint.getCamelContext(), false);
         // set the from endpoint
-        newExchange.setFromEndpoint(endpoint);
+        newExchange.adapt(ExtendedExchange.class).setFromEndpoint(endpoint);
         return newExchange;
     }
 
@@ -163,7 +164,7 @@ public class DisruptorConsumer extends ServiceSupport 
implements Consumer, Suspe
             // (see 
org.apache.camel.processor.CamelInternalProcessor.InternalCallback#done).
             // To solve this problem, a new synchronization is set on the 
exchange that is to be
             // processed
-            result.addOnCompletion(new Synchronization() {
+            result.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
                 @Override
                 public void onComplete(Exchange exchange) {
                     synchronizedExchange.consumed(result);
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
index 9e736d0..88ae473 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
@@ -23,6 +23,7 @@ import com.lmax.disruptor.InsufficientCapacityException;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.ExchangeHelper;
@@ -81,7 +82,7 @@ public class DisruptorProducer extends DefaultAsyncProducer {
             final CountDownLatch latch = new CountDownLatch(1);
 
             // we should wait for the reply so install a on completion so we 
know when its complete
-            copy.addOnCompletion(new SynchronizationAdapter() {
+            copy.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                 @Override
                 public void onDone(final Exchange response) {
                     // check for timeout, which then already would have 
invoked the latch
@@ -193,7 +194,7 @@ public class DisruptorProducer extends DefaultAsyncProducer 
{
         // use a new copy of the exchange to route async
         final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
handover);
         // set a new from endpoint to be the disruptor
-        copy.setFromEndpoint(endpoint);
+        copy.adapt(ExtendedExchange.class).setFromEndpoint(endpoint);
         return copy;
     }
 }
diff --git 
a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java
 
b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java
index 811e8f9..4604985 100644
--- 
a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java
+++ 
b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.disruptor;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.support.SynchronizationAdapter;
@@ -46,7 +47,7 @@ public class DisruptorInOutChainedWithOnCompletionTest 
extends CamelTestSupport
                     @Override
                     public void process(final Exchange exchange) throws 
Exception {
                         // should come in last
-                        exchange.addOnCompletion(new SynchronizationAdapter() {
+                        
exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                             @Override
                             public void onDone(final Exchange exchange) {
                                 template.sendBody("mock:c", 
"onCustomCompletion");
diff --git 
a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java
 
b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java
index 9d524f2..01f7e26 100644
--- 
a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java
+++ 
b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.disruptor;
 
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.support.SynchronizationAdapter;
@@ -56,7 +57,7 @@ public class DisruptorWaitForTaskCompleteOnCompletionTest 
extends CamelTestSuppo
                 from("direct:start").process(new Processor() {
                     @Override
                     public void process(final Exchange exchange) throws 
Exception {
-                        exchange.addOnCompletion(new SynchronizationAdapter() {
+                        
exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                             @Override
                             public void onDone(final Exchange exchange) {
                                 done += "A";
diff --git 
a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java
 
b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java
index adad0de..5f8acea 100644
--- 
a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java
+++ 
b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java
@@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.support.SynchronizationAdapter;
@@ -56,7 +57,7 @@ public class DisruptorWaitForTaskNeverOnCompletionTest 
extends CamelTestSupport
                 from("direct:start").process(new Processor() {
                     @Override
                     public void process(final Exchange exchange) throws 
Exception {
-                        exchange.addOnCompletion(new SynchronizationAdapter() {
+                        
exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                             @Override
                             public void onDone(final Exchange exchange) {
                                 done = done + "A";
diff --git 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
index 3077cd0..fdefbd7 100644
--- 
a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
+++ 
b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
@@ -28,6 +28,7 @@ import javax.sql.DataSource;
 import com.opengamma.elsql.ElSql;
 import com.opengamma.elsql.SpringSqlParams;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.component.sql.ResultSetIterator;
 import org.apache.camel.component.sql.ResultSetIteratorCompletion;
 import org.apache.camel.component.sql.SqlConstants;
@@ -219,7 +220,7 @@ public class ElsqlProducer extends DefaultProducer {
                 }
                 // we do not know the row count so we cannot set a ROW_COUNT 
header
                 // defer closing the iterator when the exchange is complete
-                exchange.addOnCompletion(new 
ResultSetIteratorCompletion(iterator));
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
ResultSetIteratorCompletion(iterator));
             }
         } catch (final Exception e) {
             // in case of exception then close all this before rethrow
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 76022f4..56d52ff 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
@@ -440,7 +441,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
 
             // register on completion callback that does the completion 
strategies
             // (for instance to move the file after we have processed it)
-            exchange.addOnCompletion(new GenericFileOnCompletion<>(endpoint, 
operations, processStrategy, target, absoluteFileName));
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
GenericFileOnCompletion<>(endpoint, operations, processStrategy, target, 
absoluteFileName));
 
             log.debug("About to process file: {} using exchange: {}", target, 
exchange);
 
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index 0a62c7d..25573fb 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Ordered;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
@@ -106,7 +107,7 @@ public abstract class RemoteFileConsumer<T> extends 
GenericFileConsumer<T> {
         // defer disconnect til the UoW is complete - but only the last 
exchange from the batch should do that
         boolean isLast = exchange.getProperty(Exchange.BATCH_COMPLETE, true, 
Boolean.class);
         if (isLast && getEndpoint().isDisconnect()) {
-            exchange.addOnCompletion(new SynchronizationAdapter() {
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                 @Override
                 public void onDone(Exchange exchange) {
                     log.trace("processExchange disconnect from: {}", 
getEndpoint());
diff --git 
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
 
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
index 953ca13..d3c25fb 100644
--- 
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
+++ 
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
@@ -29,6 +29,7 @@ import 
com.google.api.services.gmail.model.ModifyMessageRequest;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
@@ -108,7 +109,7 @@ public class GoogleMailStreamConsumer extends 
ScheduledBatchPollingConsumer {
             pendingExchanges = total - index - 1;
 
             // add on completion to handle after work when the exchange is done
-            exchange.addOnCompletion(new Synchronization() {
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
                 public void onComplete(Exchange exchange) {
                     processCommit(exchange, unreadLabelId);
                 }
diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index bfd78cf..3f47ff93 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -27,6 +27,7 @@ import com.google.api.services.pubsub.model.PullRequest;
 import com.google.api.services.pubsub.model.PullResponse;
 import com.google.api.services.pubsub.model.ReceivedMessage;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import 
org.apache.camel.component.google.pubsub.consumer.ExchangeAckTransaction;
 import org.apache.camel.spi.Synchronization;
@@ -147,7 +148,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
                         }
 
                         if (endpoint.getAckMode() != 
GooglePubsubConstants.AckMode.NONE) {
-                            
exchange.addOnCompletion(GooglePubsubConsumer.this.ackStrategy);
+                            
exchange.adapt(ExtendedExchange.class).addOnCompletion(GooglePubsubConsumer.this.ackStrategy);
                         }
 
                         try {
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 8e105cd..34f2fb3 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
 import javax.security.auth.login.Configuration;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
@@ -236,7 +237,7 @@ public final class HdfsConsumer extends 
ScheduledPollConsumer {
 
     protected void updateNewExchange(Exchange exchange, int index, 
HdfsInputStream hdfsFile) {
         // do not share unit of work
-        exchange.setUnitOfWork(null);
+        exchange.adapt(ExtendedExchange.class).setUnitOfWork(null);
 
         exchange.setProperty(Exchange.SPLIT_INDEX, index);
 
diff --git 
a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
 
b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
index 77ee082..716be67 100644
--- 
a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
+++ 
b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
@@ -37,6 +37,7 @@ import java.util.stream.Collectors;
 
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.http.helper.HttpMethodHelper;
@@ -213,7 +214,7 @@ public class HttpProducer extends DefaultProducer {
             final HttpResponse response = httpResponse;
             if (httpResponse != null && getEndpoint().isDisableStreamCache()) {
                 // close the stream at the end of the exchange to ensure it 
gets eventually closed later
-                exchange.addOnCompletion(new SynchronizationAdapter() {
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                     @Override
                     public void onDone(Exchange exchange) {
                         try {
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
index 11dfc0a..f048d14 100644
--- 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.Message;
 import org.apache.camel.RuntimeCamelException;
@@ -141,7 +142,7 @@ public class IgniteCacheProducer extends 
DefaultAsyncProducer {
 
         out.setBody(cursor.iterator());
 
-        exchange.addOnCompletion(new Synchronization() {
+        exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
             @Override
             public void onFailure(Exchange exchange) {
                 cursor.close();
diff --git 
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
 
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
index fbb27d2..414edaf 100644
--- 
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
+++ 
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
@@ -24,6 +24,7 @@ import io.iron.ironmq.Message;
 import io.iron.ironmq.Messages;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ExchangeHelper;
@@ -96,7 +97,7 @@ public class IronMQConsumer extends 
ScheduledBatchPollingConsumer {
             // add on completion to handle after work when the exchange is done
             // if batchDelete is not enabled
             if (!getEndpoint().getConfiguration().isBatchDelete()) {
-                exchange.addOnCompletion(new Synchronization() {
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
                     final String reservationId = 
ExchangeHelper.getMandatoryHeader(exchange, 
IronMQConstants.MESSAGE_RESERVATION_ID, String.class);
                     final String messageid = 
ExchangeHelper.getMandatoryHeader(exchange, IronMQConstants.MESSAGE_ID, 
String.class);
 
diff --git 
a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
 
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
index 27df28e..dc2c37d 100644
--- 
a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
+++ 
b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import javax.sql.DataSource;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.support.PropertyBindingSupport;
@@ -321,7 +322,7 @@ public class JdbcProducer extends DefaultProducer {
         exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, 
iterator.getColumnNames());
         if (outputType == JdbcOutputType.StreamList) {
             exchange.getOut().setBody(new 
StreamListIterator(getEndpoint().getCamelContext(), 
getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), iterator));
-            exchange.addOnCompletion(new 
ResultSetIteratorCompletion(iterator));
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
ResultSetIteratorCompletion(iterator));
             // do not close resources as we are in streaming mode
             answer = false;
         } else if (outputType == JdbcOutputType.SelectList) {
diff --git 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
index 5161b9a..87fb03d 100644
--- 
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
+++ 
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java
@@ -20,6 +20,7 @@ import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.springframework.orm.jpa.SharedEntityManagerCreator;
 
 /**
@@ -64,7 +65,7 @@ public final class JpaHelper {
             if (exchange != null) {
                 // we want to reuse the EM so store as property and make sure 
we close it when done with the exchange
                 exchange.setProperty(JpaConstants.ENTITY_MANAGER, em);
-                exchange.addOnCompletion(new 
JpaCloseEntityManagerOnCompletion(em));
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
JpaCloseEntityManagerOnCompletion(em));
             }
         }
 
@@ -74,7 +75,7 @@ public final class JpaHelper {
             if (exchange != null) {
                 // we want to reuse the EM so store as property and make sure 
we close it when done with the exchange
                 exchange.setProperty(JpaConstants.ENTITY_MANAGER, em);
-                exchange.addOnCompletion(new 
JpaCloseEntityManagerOnCompletion(em));
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
JpaCloseEntityManagerOnCompletion(em));
             }
         }
 
diff --git 
a/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java
 
b/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java
index 23d9df1..4ad646a 100644
--- 
a/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java
+++ 
b/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.DefaultExchangeHolder;
 import org.fusesource.hawtbuf.Buffer;
@@ -78,7 +79,7 @@ public final class LevelDBCamelCodec {
         if (fromEndpointUri != null) {
             Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri);
             if (fromEndpoint != null) {
-                answer.setFromEndpoint(fromEndpoint);
+                
answer.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
             }
         }
         return answer;
diff --git 
a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
 
b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
index ac4b306..d010332 100644
--- 
a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
+++ 
b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
@@ -37,6 +37,7 @@ import com.sun.mail.imap.IMAPStore;
 import com.sun.mail.imap.SortTerm;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.attachment.Attachment;
@@ -202,7 +203,7 @@ public class MailConsumer extends 
ScheduledBatchPollingConsumer {
             final Message mail = 
exchange.getIn(MailMessage.class).getOriginalMessage();
 
             // add on completion to handle after work when the exchange is done
-            exchange.addOnCompletion(new SynchronizationAdapter() {
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                 public void onComplete(Exchange exchange) {
                     processCommit(mail, exchange);
                 }
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
index 016a2f6..c1b6ac6 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
@@ -46,6 +46,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.util.ReferenceCountUtil;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.TypeConverter;
@@ -103,7 +104,7 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
             // for proxy use case pass the request body buffer directly to the 
response to avoid additional processing
             // we need to retain it so that the request can be released and we 
can keep the content
             answer.setBody(request.content().retain());
-            exchange.addOnCompletion(new SynchronizationAdapter() {
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                 @Override
                 public void onDone(Exchange exchange) {
                     ReferenceCountUtil.release(request.content());
@@ -113,7 +114,7 @@ public class DefaultNettyHttpBinding implements 
NettyHttpBinding, Cloneable {
             // turn the body into stream cached (on the client/consumer side 
we can facade the netty stream instead of converting to byte array)
             NettyChannelBufferStreamCache cache = new 
NettyChannelBufferStreamCache(request.content());
             // add on completion to the cache which is needed for Camel to 
keep track of the lifecycle of the cache
-            exchange.addOnCompletion(new 
NettyChannelBufferStreamCacheOnCompletion(cache));
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
NettyChannelBufferStreamCacheOnCompletion(cache));
             answer.setBody(cache);
         }
         return answer;
diff --git 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
index 5ecc8a9..87732a3 100644
--- 
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
+++ 
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java
@@ -26,6 +26,7 @@ import io.netty.handler.codec.http.HttpUtil;
 import io.netty.util.ReferenceCountUtil;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.component.netty.NettyConfiguration;
 import org.apache.camel.component.netty.NettyConstants;
 import org.apache.camel.component.netty.NettyProducer;
@@ -118,7 +119,7 @@ public class NettyHttpProducer extends NettyProducer {
                             response.content().retain();
 
                             // need to release the response when we are done
-                            exchange.addOnCompletion(new 
SynchronizationAdapter() {
+                            
exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                                 @Override
                                 public void onDone(Exchange exchange) {
                                     if (response.refCnt() > 0) {
diff --git 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 08887ce..1bd3e6e 100644
--- 
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ 
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -43,6 +43,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.ExchangeHelper;
@@ -264,7 +265,7 @@ public class NettyProducer extends DefaultAsyncProducer {
         if (getConfiguration().isReuseChannel() && 
exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) {
             exchange.setProperty(NettyConstants.NETTY_CHANNEL, channel);
             // and defer closing the channel until we are done routing the 
exchange
-            exchange.addOnCompletion(new SynchronizationAdapter() {
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                 @Override
                 public void onComplete(Exchange exchange) {
                     // should channel be closed after complete?
diff --git 
a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
 
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
index 5165d59..12e0a0d 100644
--- 
a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
+++ 
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
@@ -26,6 +26,7 @@ import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
 import com.github.brainlag.nsq.lookup.NSQLookup;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
 import org.slf4j.Logger;
@@ -109,7 +110,7 @@ public class NsqConsumer extends DefaultConsumer {
                 if (configuration.getAutoFinish()) {
                     msg.finished();
                 } else {
-                    exchange.addOnCompletion(new NsqSynchronization(msg, 
(int)configuration.getRequeueInterval()));
+                    exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
NsqSynchronization(msg, (int)configuration.getRequeueInterval()));
                 }
                 processor.process(exchange);
             } catch (Exception e) {
diff --git 
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
 
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
index 4390ae4..05609c6 100644
--- 
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
+++ 
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
@@ -136,7 +137,7 @@ public class PgReplicationSlotConsumer extends 
ScheduledPollConsumer {
             }
         }, delay, delay, TimeUnit.SECONDS);
 
-        exchange.addOnCompletion(new Synchronization() {
+        exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
             @Override
             public void onComplete(Exchange exchange) {
                 processCommit(exchange);
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
index 19d50a6..b34cd76 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java
@@ -35,6 +35,7 @@ import javax.management.openmbean.TabularType;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
@@ -171,7 +172,7 @@ public class DefaultCamelReactiveStreamsService extends 
ServiceSupport implement
 
         DelayedMonoPublisher<Exchange> publisher = new 
DelayedMonoPublisher<>(this.workerPool);
 
-        data.addOnCompletion(new Synchronization() {
+        data.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
             @Override
             public void onComplete(Exchange exchange) {
                 publisher.setData(exchange);
diff --git 
a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
 
b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
index fee248d..7640274 100644
--- 
a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
+++ 
b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java
@@ -23,6 +23,7 @@ import java.util.function.Supplier;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.builder.RouteBuilder;
 import 
org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
@@ -314,7 +315,7 @@ final class ReactorStreamsService extends ServiceSupport 
implements CamelReactiv
         }
 
         return Mono.<Exchange>create(
-            sink -> data.addOnCompletion(new Synchronization() {
+            sink -> data.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
                 @Override
                 public void onComplete(Exchange exchange) {
                     sink.success(exchange);
diff --git 
a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
 
b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
index 50e83a1..75e8ec9 100644
--- 
a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
+++ 
b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java
@@ -25,6 +25,7 @@ import io.reactivex.Flowable;
 import io.reactivex.Single;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.builder.RouteBuilder;
 import 
org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
@@ -304,7 +305,7 @@ final class RxJavaStreamsService extends ServiceSupport 
implements CamelReactive
         }
 
         Single<Exchange> source = Single.<Exchange>create(
-            emitter -> data.addOnCompletion(new Synchronization() {
+            emitter -> data.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
                 @Override
                 public void onComplete(Exchange exchange) {
                     emitter.onSuccess(exchange);
diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index e6c2bde..0c7b6fe 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.Suspendable;
@@ -220,7 +221,7 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
         // send a new copied exchange with new camel context
         Exchange newExchange = 
ExchangeHelper.copyExchangeAndSetCamelContext(exchange, 
getEndpoint().getCamelContext());
         // set the from endpoint
-        newExchange.setFromEndpoint(getEndpoint());
+        
newExchange.adapt(ExtendedExchange.class).setFromEndpoint(getEndpoint());
         return newExchange;
     }
 
@@ -250,7 +251,7 @@ public class SedaConsumer extends DefaultConsumer 
implements Runnable, ShutdownA
             }
 
             // handover completions, as we need to done this when the 
multicast is done
-            final List<Synchronization> completions = 
exchange.handoverCompletions();
+            final List<Synchronization> completions = 
exchange.adapt(ExtendedExchange.class).handoverCompletions();
 
             // use a multicast processor to process it
             AsyncProcessor mp = getEndpoint().getConsumerMulticastProcessor();
diff --git 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java
 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index a0ea010..1906d97 100644
--- 
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ 
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.WaitForTaskToComplete;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.ExchangeHelper;
@@ -67,7 +68,7 @@ public class SedaProducer extends DefaultAsyncProducer {
             final CountDownLatch latch = new CountDownLatch(1);
 
             // we should wait for the reply so install a on completion so we 
know when its complete
-            copy.addOnCompletion(new SynchronizationAdapter() {
+            copy.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
                 @Override
                 public void onDone(Exchange response) {
                     // check for timeout, which then already would have 
invoked the latch
@@ -165,7 +166,7 @@ public class SedaProducer extends DefaultAsyncProducer {
         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
handover, true,
             synchronization -> 
!synchronization.getClass().getName().contains("RestBindingMarshalOnCompletion"));
         // set a new from endpoint to be the seda queue
-        copy.setFromEndpoint(endpoint);
+        copy.adapt(ExtendedExchange.class).setFromEndpoint(endpoint);
         return copy;
     }
 
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index ec6168e..7990e4d 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -38,6 +38,7 @@ import javax.jms.Session;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
@@ -564,7 +565,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             aggregationStrategy.onCompletion(exchange);
 
             SessionCompletion sessionCompletion = new 
SessionCompletion(session);
-            exchange.addOnCompletion(sessionCompletion);
+            
exchange.adapt(ExtendedExchange.class).addOnCompletion(sessionCompletion);
             try {
                 getProcessor().process(exchange);
                 long total = MESSAGE_PROCESSED.addAndGet(batchSize);
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index 13d50fd..484660a 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.support.DefaultProducer;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
@@ -294,7 +295,7 @@ public class SqlProducer extends DefaultProducer {
                 }
                 // we do not know the row count so we cannot set a ROW_COUNT 
header
                 // defer closing the iterator when the exchange is complete
-                exchange.addOnCompletion(new 
ResultSetIteratorCompletion(iterator));
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
ResultSetIteratorCompletion(iterator));
             }
         } catch (Exception e) {
             // in case of exception then close all this before rethrow
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
index 619fcd5..e44fec9 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java
@@ -27,6 +27,7 @@ import java.io.OutputStream;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.DefaultExchangeHolder;
 import org.apache.camel.util.IOHelper;
@@ -75,7 +76,7 @@ public class JdbcCamelCodec {
         if (fromEndpointUri != null) {
             Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri);
             if (fromEndpoint != null) {
-                answer.setFromEndpoint(fromEndpoint);
+                
answer.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
             }
         }
         return answer;
diff --git 
a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
 
b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
index 23a670e..e38e285 100644
--- 
a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
+++ 
b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java
@@ -26,6 +26,7 @@ import java.nio.file.Files;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.WrappedFile;
 import org.apache.camel.component.file.FileConsumer;
 import org.apache.camel.component.file.GenericFile;
@@ -149,7 +150,7 @@ public class TarAggregationStrategy implements 
AggregationStrategy {
                 throw new GenericFileOperationFailedException(e.getMessage(), 
e);
             }
             answer = newExchange;
-            answer.addOnCompletion(new DeleteTarFileOnCompletion(tarFile));
+            answer.adapt(ExtendedExchange.class).addOnCompletion(new 
DeleteTarFileOnCompletion(tarFile));
         } else {
             tarFile = oldExchange.getIn().getBody(File.class);
         }
diff --git 
a/components/camel-vm/src/main/java/org/apache/camel/component/vm/VmConsumer.java
 
b/components/camel-vm/src/main/java/org/apache/camel/component/vm/VmConsumer.java
index f82b9e1..6e9430c 100644
--- 
a/components/camel-vm/src/main/java/org/apache/camel/component/vm/VmConsumer.java
+++ 
b/components/camel-vm/src/main/java/org/apache/camel/component/vm/VmConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.vm;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.seda.SedaConsumer;
 import org.apache.camel.support.ExchangeHelper;
@@ -52,7 +53,7 @@ public class VmConsumer extends SedaConsumer implements 
CamelContextAware {
         // send a new copied exchange with the camel context from this consumer
         Exchange newExchange = 
ExchangeHelper.copyExchangeAndSetCamelContext(exchange, getCamelContext());
         // set the from endpoint
-        newExchange.setFromEndpoint(getEndpoint());
+        
newExchange.adapt(ExtendedExchange.class).setFromEndpoint(getEndpoint());
         return newExchange;
     }
 
diff --git 
a/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java
 
b/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java
index 2eaabcb..13d24a3 100644
--- 
a/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java
+++ 
b/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java
@@ -37,6 +37,7 @@ import javax.xml.transform.URIResolver;
 import javax.xml.transform.sax.SAXSource;
 import javax.xml.transform.stream.StreamSource;
 
+import org.apache.camel.ExtendedExchange;
 import org.xml.sax.EntityResolver;
 
 import org.apache.camel.Exchange;
@@ -95,7 +96,7 @@ public class XsltBuilder implements Processor {
         if (isDeleteOutputFile()) {
             // add on completion so we can delete the file when the Exchange 
is done
             String fileName = ExchangeHelper.getMandatoryHeader(exchange, 
Exchange.XSLT_FILE_NAME, String.class);
-            exchange.addOnCompletion(new XsltBuilderOnCompletion(fileName));
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
XsltBuilderOnCompletion(fileName));
         }
 
         Transformer transformer = getTransformer();
diff --git 
a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java
 
b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java
index 04ae295..d20eec1 100644
--- 
a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java
+++ 
b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java
@@ -31,6 +31,7 @@ import java.util.Map;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.WrappedFile;
 import org.apache.camel.component.file.FileConsumer;
 import org.apache.camel.component.file.GenericFile;
@@ -162,7 +163,7 @@ public class ZipAggregationStrategy implements 
AggregationStrategy {
                 throw new GenericFileOperationFailedException(e.getMessage(), 
e);
             }
             answer = newExchange;
-            answer.addOnCompletion(new DeleteZipFileOnCompletion(zipFile));
+            answer.adapt(ExtendedExchange.class).addOnCompletion(new 
DeleteZipFileOnCompletion(zipFile));
         } else {
             zipFile = oldExchange.getIn().getBody(File.class);
         }

Reply via email to