This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8a18b8c67533bd16340f0848b15df653ee2dc5a6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jan 16 16:21:39 2020 +0100 CAMEL-14409: camel-core - ExtendedExchange for advanced API --- .../src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java | 3 ++- .../src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java | 3 ++- .../main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java | 5 +++-- .../java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java | 3 ++- .../camel/processor/aggregate/cassandra/CassandraCamelCodec.java | 3 ++- .../main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java | 3 ++- .../camel/component/cxf/CxfConsumerClientDisconnectedTest.java | 3 ++- .../org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java | 3 ++- .../component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java | 3 ++- .../org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java | 3 ++- .../java/org/apache/camel/component/directvm/DirectVmProcessor.java | 3 ++- .../camel/component/disruptor/AbstractSynchronizedExchange.java | 5 +++-- .../java/org/apache/camel/component/disruptor/DisruptorConsumer.java | 5 +++-- .../java/org/apache/camel/component/disruptor/DisruptorProducer.java | 5 +++-- .../disruptor/DisruptorInOutChainedWithOnCompletionTest.java | 3 ++- .../disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java | 3 ++- .../disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java | 3 ++- .../main/java/org/apache/camel/component/elsql/ElsqlProducer.java | 3 ++- .../java/org/apache/camel/component/file/GenericFileConsumer.java | 3 ++- .../org/apache/camel/component/file/remote/RemoteFileConsumer.java | 3 ++- .../camel/component/google/mail/stream/GoogleMailStreamConsumer.java | 3 ++- .../apache/camel/component/google/pubsub/GooglePubsubConsumer.java | 3 ++- .../src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java | 3 ++- .../src/main/java/org/apache/camel/component/http/HttpProducer.java | 3 ++- .../org/apache/camel/component/ignite/cache/IgniteCacheProducer.java | 3 ++- .../main/java/org/apache/camel/component/ironmq/IronMQConsumer.java | 3 ++- .../src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java | 3 ++- .../src/main/java/org/apache/camel/component/jpa/JpaHelper.java | 5 +++-- .../java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java | 3 ++- .../src/main/java/org/apache/camel/component/mail/MailConsumer.java | 3 ++- .../apache/camel/component/netty/http/DefaultNettyHttpBinding.java | 5 +++-- .../org/apache/camel/component/netty/http/NettyHttpProducer.java | 3 ++- .../main/java/org/apache/camel/component/netty/NettyProducer.java | 3 ++- .../src/main/java/org/apache/camel/component/nsq/NsqConsumer.java | 3 ++- .../component/pg/replication/slot/PgReplicationSlotConsumer.java | 3 ++- .../reactive/streams/engine/DefaultCamelReactiveStreamsService.java | 3 ++- .../apache/camel/component/reactor/engine/ReactorStreamsService.java | 3 ++- .../apache/camel/component/rxjava/engine/RxJavaStreamsService.java | 3 ++- .../src/main/java/org/apache/camel/component/seda/SedaConsumer.java | 5 +++-- .../src/main/java/org/apache/camel/component/seda/SedaProducer.java | 5 +++-- .../org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java | 3 ++- .../src/main/java/org/apache/camel/component/sql/SqlProducer.java | 3 ++- .../org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java | 3 ++- .../camel/processor/aggregate/tarfile/TarAggregationStrategy.java | 3 ++- .../src/main/java/org/apache/camel/component/vm/VmConsumer.java | 3 ++- .../src/main/java/org/apache/camel/component/xslt/XsltBuilder.java | 3 ++- .../camel/processor/aggregate/zipfile/ZipAggregationStrategy.java | 3 ++- 47 files changed, 102 insertions(+), 55 deletions(-) diff --git a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java index 55a21c7..b1d1b49 100644 --- a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java +++ b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java @@ -31,6 +31,7 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; @@ -152,7 +153,7 @@ public class S3Consumer extends ScheduledBatchPollingConsumer { pendingExchanges = total - index - 1; // add on completion to handle after work when the exchange is done - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { public void onComplete(Exchange exchange) { processCommit(exchange); } diff --git a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java index c9d1c63..6f6b2cc 100644 --- a/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java +++ b/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java @@ -26,6 +26,7 @@ import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -189,7 +190,7 @@ public class S3Endpoint extends ScheduledPollEndpoint { IOHelper.close(s3Object); } else { if (configuration.isAutocloseBody()) { - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { IOHelper.close(s3Object); diff --git a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java index 2d57f97..3df7fa9 100644 --- a/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java +++ b/components/camel-aws-sqs/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java @@ -37,6 +37,7 @@ import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; @@ -165,7 +166,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { } final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS); - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { cancelExtender(exchange); @@ -185,7 +186,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { } // add on completion to handle after work when the exchange is done - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { public void onComplete(Exchange exchange) { processCommit(exchange); } diff --git a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java index c9ecb30..2a9ee42 100644 --- a/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java +++ b/components/camel-beanstalk/src/main/java/org/apache/camel/component/beanstalk/BeanstalkConsumer.java @@ -25,6 +25,7 @@ import com.surftools.BeanstalkClient.Client; import com.surftools.BeanstalkClient.Job; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.beanstalk.processors.BuryCommand; @@ -114,7 +115,7 @@ public class BeanstalkConsumer extends ScheduledPollConsumer { if (!awaitJob) { client.delete(job.getJobId()); } else { - exchange.addOnCompletion(sync); + exchange.adapt(ExtendedExchange.class).addOnCompletion(sync); } return exchange; diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java index 6428224..7797766 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/processor/aggregate/cassandra/CassandraCamelCodec.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.DefaultExchangeHolder; @@ -64,7 +65,7 @@ public class CassandraCamelCodec { if (fromEndpointUri != null) { Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri); if (fromEndpoint != null) { - answer.setFromEndpoint(fromEndpoint); + answer.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint); } } return answer; diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java index 19d197f..c6cfb95 100644 --- a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java +++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.util.IOHelper; import org.apache.commons.csv.CSVFormat; @@ -129,7 +130,7 @@ abstract class CsvUnmarshaller { CSVParser parser = new CSVParser(reader, format); CsvIterator answer = new CsvIterator(parser, converter); // add to UoW so we can close the iterator so it can release any resources - exchange.addOnCompletion(new CsvUnmarshalOnCompletion(answer)); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new CsvUnmarshalOnCompletion(answer)); return answer; } catch (Exception e) { IOHelper.close(reader); diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java index 5364a97..f532e14 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerClientDisconnectedTest.java @@ -20,6 +20,7 @@ import java.io.BufferedWriter; import java.io.OutputStreamWriter; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.spi.Synchronization; @@ -54,7 +55,7 @@ public class CxfConsumerClientDisconnectedTest extends CamelTestSupport { .process(exchange-> { Thread.sleep(100); - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { template.sendBody("mock:onComplete", ""); diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java index 38fc677..b48f1d2 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerStreamCacheTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.cxf; +import org.apache.camel.ExtendedExchange; import org.w3c.dom.Node; import org.apache.camel.Exchange; @@ -66,7 +67,7 @@ public class CxfConsumerStreamCacheTest extends CamelTestSupport { cos.close(); exchange.getOut().setBody(cos.newStreamCache()); - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { template.sendBody("mock:onComplete", ""); diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java index c9d16a1..13248ad 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java @@ -20,6 +20,7 @@ import java.io.BufferedWriter; import java.io.OutputStreamWriter; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.cxf.CXFTestSupport; import org.apache.camel.component.mock.MockEndpoint; @@ -55,7 +56,7 @@ public class CxfRsConsumerClientDisconnectedTest extends CamelTestSupport { .process(exchange-> { Thread.sleep(100); - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { template.sendBody("mock:onComplete", ""); diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java index 7b78d84..471ea2a 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.cxf.jaxrs; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.cxf.CXFTestSupport; import org.apache.camel.component.cxf.jaxrs.testbean.Customer; @@ -62,7 +63,7 @@ public class CxfRsStreamCacheTest extends CamelTestSupport { cos.close(); exchange.getOut().setBody(cos.newStreamCache()); - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { template.sendBody("mock:onComplete", ""); diff --git a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java index 3a434f3..266d111 100644 --- a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java +++ b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProcessor.java @@ -18,6 +18,7 @@ package org.apache.camel.component.directvm; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; @@ -87,7 +88,7 @@ public final class DirectVmProcessor extends DelegateAsyncProcessor { // send a new copied exchange with new camel context (do not handover completions) Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext(), false); // set the from endpoint - newExchange.setFromEndpoint(endpoint); + newExchange.adapt(ExtendedExchange.class).setFromEndpoint(endpoint); // The StreamCache created by the child routes must not be // closed by the unit of work of the child route, but by the unit of // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting). diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java index 8069970..3709e41 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java @@ -19,6 +19,7 @@ package org.apache.camel.component.disruptor; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.UnitOfWorkHelper; import org.slf4j.Logger; @@ -31,7 +32,7 @@ public abstract class AbstractSynchronizedExchange implements SynchronizedExchan public AbstractSynchronizedExchange(Exchange exchange) { this.exchange = exchange; - synchronizations = exchange.handoverCompletions(); + synchronizations = exchange.adapt(ExtendedExchange.class).handoverCompletions(); } @Override @@ -43,7 +44,7 @@ public abstract class AbstractSynchronizedExchange implements SynchronizedExchan public Exchange cancelAndGetOriginalExchange() { if (synchronizations != null) { for (Synchronization synchronization : synchronizations) { - exchange.addOnCompletion(synchronization); + exchange.adapt(ExtendedExchange.class).addOnCompletion(synchronization); } } diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java index 31e90ff..77b74e4 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java @@ -23,6 +23,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Consumer; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.Suspendable; @@ -138,7 +139,7 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe final Exchange newExchange = ExchangeHelper .copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext(), false); // set the from endpoint - newExchange.setFromEndpoint(endpoint); + newExchange.adapt(ExtendedExchange.class).setFromEndpoint(endpoint); return newExchange; } @@ -163,7 +164,7 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe // (see org.apache.camel.processor.CamelInternalProcessor.InternalCallback#done). // To solve this problem, a new synchronization is set on the exchange that is to be // processed - result.addOnCompletion(new Synchronization() { + result.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { synchronizedExchange.consumed(result); diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java index 9e736d0..88ae473 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java @@ -23,6 +23,7 @@ import com.lmax.disruptor.InsufficientCapacityException; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.ExtendedExchange; import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.ExchangeHelper; @@ -81,7 +82,7 @@ public class DisruptorProducer extends DefaultAsyncProducer { final CountDownLatch latch = new CountDownLatch(1); // we should wait for the reply so install a on completion so we know when its complete - copy.addOnCompletion(new SynchronizationAdapter() { + copy.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(final Exchange response) { // check for timeout, which then already would have invoked the latch @@ -193,7 +194,7 @@ public class DisruptorProducer extends DefaultAsyncProducer { // use a new copy of the exchange to route async final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover); // set a new from endpoint to be the disruptor - copy.setFromEndpoint(endpoint); + copy.adapt(ExtendedExchange.class).setFromEndpoint(endpoint); return copy; } } diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java index 811e8f9..4604985 100644 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java +++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.disruptor; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -46,7 +47,7 @@ public class DisruptorInOutChainedWithOnCompletionTest extends CamelTestSupport @Override public void process(final Exchange exchange) throws Exception { // should come in last - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(final Exchange exchange) { template.sendBody("mock:c", "onCustomCompletion"); diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java index 9d524f2..01f7e26 100644 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java +++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java @@ -18,6 +18,7 @@ package org.apache.camel.component.disruptor; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -56,7 +57,7 @@ public class DisruptorWaitForTaskCompleteOnCompletionTest extends CamelTestSuppo from("direct:start").process(new Processor() { @Override public void process(final Exchange exchange) throws Exception { - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(final Exchange exchange) { done += "A"; diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java index adad0de..5f8acea 100644 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java +++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -56,7 +57,7 @@ public class DisruptorWaitForTaskNeverOnCompletionTest extends CamelTestSupport from("direct:start").process(new Processor() { @Override public void process(final Exchange exchange) throws Exception { - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(final Exchange exchange) { done = done + "A"; diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java index 3077cd0..fdefbd7 100644 --- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java +++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java @@ -28,6 +28,7 @@ import javax.sql.DataSource; import com.opengamma.elsql.ElSql; import com.opengamma.elsql.SpringSqlParams; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.component.sql.ResultSetIterator; import org.apache.camel.component.sql.ResultSetIteratorCompletion; import org.apache.camel.component.sql.SqlConstants; @@ -219,7 +220,7 @@ public class ElsqlProducer extends DefaultProducer { } // we do not know the row count so we cannot set a ROW_COUNT header // defer closing the iterator when the exchange is complete - exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator)); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new ResultSetIteratorCompletion(iterator)); } } catch (final Exception e) { // in case of exception then close all this before rethrow diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 76022f4..56d52ff 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -26,6 +26,7 @@ import java.util.regex.Pattern; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -440,7 +441,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // register on completion callback that does the completion strategies // (for instance to move the file after we have processed it) - exchange.addOnCompletion(new GenericFileOnCompletion<>(endpoint, operations, processStrategy, target, absoluteFileName)); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new GenericFileOnCompletion<>(endpoint, operations, processStrategy, target, absoluteFileName)); log.debug("About to process file: {} using exchange: {}", target, exchange); diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java index 0a62c7d..25573fb 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Ordered; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; @@ -106,7 +107,7 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { // defer disconnect til the UoW is complete - but only the last exchange from the batch should do that boolean isLast = exchange.getProperty(Exchange.BATCH_COMPLETE, true, Boolean.class); if (isLast && getEndpoint().isDisconnect()) { - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { log.trace("processExchange disconnect from: {}", getEndpoint()); diff --git a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java index 953ca13..d3c25fb 100644 --- a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java +++ b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java @@ -29,6 +29,7 @@ import com.google.api.services.gmail.model.ModifyMessageRequest; import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ScheduledBatchPollingConsumer; @@ -108,7 +109,7 @@ public class GoogleMailStreamConsumer extends ScheduledBatchPollingConsumer { pendingExchanges = total - index - 1; // add on completion to handle after work when the exchange is done - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { public void onComplete(Exchange exchange) { processCommit(exchange, unreadLabelId); } diff --git a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index bfd78cf..3f47ff93 100644 --- a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -27,6 +27,7 @@ import com.google.api.services.pubsub.model.PullRequest; import com.google.api.services.pubsub.model.PullResponse; import com.google.api.services.pubsub.model.ReceivedMessage; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.component.google.pubsub.consumer.ExchangeAckTransaction; import org.apache.camel.spi.Synchronization; @@ -147,7 +148,7 @@ class GooglePubsubConsumer extends DefaultConsumer { } if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { - exchange.addOnCompletion(GooglePubsubConsumer.this.ackStrategy); + exchange.adapt(ExtendedExchange.class).addOnCompletion(GooglePubsubConsumer.this.ackStrategy); } try { diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 8e105cd..34f2fb3 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import javax.security.auth.login.Configuration; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -236,7 +237,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer { protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) { // do not share unit of work - exchange.setUnitOfWork(null); + exchange.adapt(ExtendedExchange.class).setUnitOfWork(null); exchange.setProperty(Exchange.SPLIT_INDEX, index); diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java index 77ee082..716be67 100644 --- a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java +++ b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.http.helper.HttpMethodHelper; @@ -213,7 +214,7 @@ public class HttpProducer extends DefaultProducer { final HttpResponse response = httpResponse; if (httpResponse != null && getEndpoint().isDisableStreamCache()) { // close the stream at the end of the exchange to ensure it gets eventually closed later - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { try { diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java index 11dfc0a..f048d14 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; @@ -141,7 +142,7 @@ public class IgniteCacheProducer extends DefaultAsyncProducer { out.setBody(cursor.iterator()); - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onFailure(Exchange exchange) { cursor.close(); diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java index fbb27d2..414edaf 100644 --- a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java @@ -24,6 +24,7 @@ import io.iron.ironmq.Message; import io.iron.ironmq.Messages; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ExchangeHelper; @@ -96,7 +97,7 @@ public class IronMQConsumer extends ScheduledBatchPollingConsumer { // add on completion to handle after work when the exchange is done // if batchDelete is not enabled if (!getEndpoint().getConfiguration().isBatchDelete()) { - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { final String reservationId = ExchangeHelper.getMandatoryHeader(exchange, IronMQConstants.MESSAGE_RESERVATION_ID, String.class); final String messageid = ExchangeHelper.getMandatoryHeader(exchange, IronMQConstants.MESSAGE_ID, String.class); diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java index 27df28e..dc2c37d 100644 --- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java @@ -31,6 +31,7 @@ import java.util.Map; import javax.sql.DataSource; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultProducer; import org.apache.camel.support.PropertyBindingSupport; @@ -321,7 +322,7 @@ public class JdbcProducer extends DefaultProducer { exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, iterator.getColumnNames()); if (outputType == JdbcOutputType.StreamList) { exchange.getOut().setBody(new StreamListIterator(getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), iterator)); - exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator)); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new ResultSetIteratorCompletion(iterator)); // do not close resources as we are in streaming mode answer = false; } else if (outputType == JdbcOutputType.SelectList) { diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java index 5161b9a..87fb03d 100644 --- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java +++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaHelper.java @@ -20,6 +20,7 @@ import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.springframework.orm.jpa.SharedEntityManagerCreator; /** @@ -64,7 +65,7 @@ public final class JpaHelper { if (exchange != null) { // we want to reuse the EM so store as property and make sure we close it when done with the exchange exchange.setProperty(JpaConstants.ENTITY_MANAGER, em); - exchange.addOnCompletion(new JpaCloseEntityManagerOnCompletion(em)); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new JpaCloseEntityManagerOnCompletion(em)); } } @@ -74,7 +75,7 @@ public final class JpaHelper { if (exchange != null) { // we want to reuse the EM so store as property and make sure we close it when done with the exchange exchange.setProperty(JpaConstants.ENTITY_MANAGER, em); - exchange.addOnCompletion(new JpaCloseEntityManagerOnCompletion(em)); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new JpaCloseEntityManagerOnCompletion(em)); } } diff --git a/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java b/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java index 23d9df1..4ad646a 100644 --- a/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java +++ b/components/camel-leveldb/src/main/java/org/apache/camel/component/leveldb/LevelDBCamelCodec.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.DefaultExchangeHolder; import org.fusesource.hawtbuf.Buffer; @@ -78,7 +79,7 @@ public final class LevelDBCamelCodec { if (fromEndpointUri != null) { Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri); if (fromEndpoint != null) { - answer.setFromEndpoint(fromEndpoint); + answer.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint); } } return answer; diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java index ac4b306..d010332 100644 --- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java +++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java @@ -37,6 +37,7 @@ import com.sun.mail.imap.IMAPStore; import com.sun.mail.imap.SortTerm; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.attachment.Attachment; @@ -202,7 +203,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { final Message mail = exchange.getIn(MailMessage.class).getOriginalMessage(); // add on completion to handle after work when the exchange is done - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { public void onComplete(Exchange exchange) { processCommit(mail, exchange); } diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java index 016a2f6..c1b6ac6 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java @@ -46,6 +46,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.util.ReferenceCountUtil; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; import org.apache.camel.TypeConverter; @@ -103,7 +104,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { // for proxy use case pass the request body buffer directly to the response to avoid additional processing // we need to retain it so that the request can be released and we can keep the content answer.setBody(request.content().retain()); - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { ReferenceCountUtil.release(request.content()); @@ -113,7 +114,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { // turn the body into stream cached (on the client/consumer side we can facade the netty stream instead of converting to byte array) NettyChannelBufferStreamCache cache = new NettyChannelBufferStreamCache(request.content()); // add on completion to the cache which is needed for Camel to keep track of the lifecycle of the cache - exchange.addOnCompletion(new NettyChannelBufferStreamCacheOnCompletion(cache)); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new NettyChannelBufferStreamCacheOnCompletion(cache)); answer.setBody(cache); } return answer; diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java index 5ecc8a9..87732a3 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java @@ -26,6 +26,7 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.util.ReferenceCountUtil; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.component.netty.NettyConfiguration; import org.apache.camel.component.netty.NettyConstants; import org.apache.camel.component.netty.NettyProducer; @@ -118,7 +119,7 @@ public class NettyHttpProducer extends NettyProducer { response.content().retain(); // need to release the response when we are done - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { if (response.refCnt() > 0) { diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index 08887ce..1bd3e6e 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -43,6 +43,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.spi.CamelLogger; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.ExchangeHelper; @@ -264,7 +265,7 @@ public class NettyProducer extends DefaultAsyncProducer { if (getConfiguration().isReuseChannel() && exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) { exchange.setProperty(NettyConstants.NETTY_CHANNEL, channel); // and defer closing the channel until we are done routing the exchange - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { // should channel be closed after complete? diff --git a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java index 5165d59..12e0a0d 100644 --- a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java +++ b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java @@ -26,6 +26,7 @@ import com.github.brainlag.nsq.lookup.DefaultNSQLookup; import com.github.brainlag.nsq.lookup.NSQLookup; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.support.DefaultConsumer; import org.slf4j.Logger; @@ -109,7 +110,7 @@ public class NsqConsumer extends DefaultConsumer { if (configuration.getAutoFinish()) { msg.finished(); } else { - exchange.addOnCompletion(new NsqSynchronization(msg, (int)configuration.getRequeueInterval())); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new NsqSynchronization(msg, (int)configuration.getRequeueInterval())); } processor.process(exchange); } catch (Exception e) { diff --git a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java index 4390ae4..05609c6 100644 --- a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java +++ b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java @@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; @@ -136,7 +137,7 @@ public class PgReplicationSlotConsumer extends ScheduledPollConsumer { } }, delay, delay, TimeUnit.SECONDS); - exchange.addOnCompletion(new Synchronization() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { processCommit(exchange); diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java index 19d50a6..b34cd76 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java @@ -35,6 +35,7 @@ import javax.management.openmbean.TabularType; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; @@ -171,7 +172,7 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool); - data.addOnCompletion(new Synchronization() { + data.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { publisher.setData(exchange); diff --git a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java index fee248d..7640274 100644 --- a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java +++ b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java @@ -23,6 +23,7 @@ import java.util.function.Supplier; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; @@ -314,7 +315,7 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv } return Mono.<Exchange>create( - sink -> data.addOnCompletion(new Synchronization() { + sink -> data.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { sink.success(exchange); diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java index 50e83a1..75e8ec9 100644 --- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java +++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java @@ -25,6 +25,7 @@ import io.reactivex.Flowable; import io.reactivex.Single; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; @@ -304,7 +305,7 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive } Single<Exchange> source = Single.<Exchange>create( - emitter -> data.addOnCompletion(new Synchronization() { + emitter -> data.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { emitter.onSuccess(exchange); diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index e6c2bde..0c7b6fe 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.Suspendable; @@ -220,7 +221,7 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA // send a new copied exchange with new camel context Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, getEndpoint().getCamelContext()); // set the from endpoint - newExchange.setFromEndpoint(getEndpoint()); + newExchange.adapt(ExtendedExchange.class).setFromEndpoint(getEndpoint()); return newExchange; } @@ -250,7 +251,7 @@ public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownA } // handover completions, as we need to done this when the multicast is done - final List<Synchronization> completions = exchange.handoverCompletions(); + final List<Synchronization> completions = exchange.adapt(ExtendedExchange.class).handoverCompletions(); // use a multicast processor to process it AsyncProcessor mp = getEndpoint().getConsumerMulticastProcessor(); diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java index a0ea010..1906d97 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.ExtendedExchange; import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.ExchangeHelper; @@ -67,7 +68,7 @@ public class SedaProducer extends DefaultAsyncProducer { final CountDownLatch latch = new CountDownLatch(1); // we should wait for the reply so install a on completion so we know when its complete - copy.addOnCompletion(new SynchronizationAdapter() { + copy.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange response) { // check for timeout, which then already would have invoked the latch @@ -165,7 +166,7 @@ public class SedaProducer extends DefaultAsyncProducer { Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true, synchronization -> !synchronization.getClass().getName().contains("RestBindingMarshalOnCompletion")); // set a new from endpoint to be the seda queue - copy.setFromEndpoint(endpoint); + copy.adapt(ExtendedExchange.class).setFromEndpoint(endpoint); return copy; } diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index ec6168e..7990e4d 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -38,6 +38,7 @@ import javax.jms.Session; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -564,7 +565,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { aggregationStrategy.onCompletion(exchange); SessionCompletion sessionCompletion = new SessionCompletion(session); - exchange.addOnCompletion(sessionCompletion); + exchange.adapt(ExtendedExchange.class).addOnCompletion(sessionCompletion); try { getProcessor().process(exchange); long total = MESSAGE_PROCESSED.addAndGet(batchSize); diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java index 13d50fd..484660a 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.support.DefaultProducer; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCallback; @@ -294,7 +295,7 @@ public class SqlProducer extends DefaultProducer { } // we do not know the row count so we cannot set a ROW_COUNT header // defer closing the iterator when the exchange is complete - exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator)); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new ResultSetIteratorCompletion(iterator)); } } catch (Exception e) { // in case of exception then close all this before rethrow diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java index 619fcd5..e44fec9 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java @@ -27,6 +27,7 @@ import java.io.OutputStream; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.DefaultExchangeHolder; import org.apache.camel.util.IOHelper; @@ -75,7 +76,7 @@ public class JdbcCamelCodec { if (fromEndpointUri != null) { Endpoint fromEndpoint = camelContext.hasEndpoint(fromEndpointUri); if (fromEndpoint != null) { - answer.setFromEndpoint(fromEndpoint); + answer.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint); } } return answer; diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java index 23a670e..e38e285 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java @@ -26,6 +26,7 @@ import java.nio.file.Files; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.WrappedFile; import org.apache.camel.component.file.FileConsumer; import org.apache.camel.component.file.GenericFile; @@ -149,7 +150,7 @@ public class TarAggregationStrategy implements AggregationStrategy { throw new GenericFileOperationFailedException(e.getMessage(), e); } answer = newExchange; - answer.addOnCompletion(new DeleteTarFileOnCompletion(tarFile)); + answer.adapt(ExtendedExchange.class).addOnCompletion(new DeleteTarFileOnCompletion(tarFile)); } else { tarFile = oldExchange.getIn().getBody(File.class); } diff --git a/components/camel-vm/src/main/java/org/apache/camel/component/vm/VmConsumer.java b/components/camel-vm/src/main/java/org/apache/camel/component/vm/VmConsumer.java index f82b9e1..6e9430c 100644 --- a/components/camel-vm/src/main/java/org/apache/camel/component/vm/VmConsumer.java +++ b/components/camel-vm/src/main/java/org/apache/camel/component/vm/VmConsumer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.vm; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.component.seda.SedaConsumer; import org.apache.camel.support.ExchangeHelper; @@ -52,7 +53,7 @@ public class VmConsumer extends SedaConsumer implements CamelContextAware { // send a new copied exchange with the camel context from this consumer Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, getCamelContext()); // set the from endpoint - newExchange.setFromEndpoint(getEndpoint()); + newExchange.adapt(ExtendedExchange.class).setFromEndpoint(getEndpoint()); return newExchange; } diff --git a/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java b/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java index 2eaabcb..13d24a3 100644 --- a/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java +++ b/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java @@ -37,6 +37,7 @@ import javax.xml.transform.URIResolver; import javax.xml.transform.sax.SAXSource; import javax.xml.transform.stream.StreamSource; +import org.apache.camel.ExtendedExchange; import org.xml.sax.EntityResolver; import org.apache.camel.Exchange; @@ -95,7 +96,7 @@ public class XsltBuilder implements Processor { if (isDeleteOutputFile()) { // add on completion so we can delete the file when the Exchange is done String fileName = ExchangeHelper.getMandatoryHeader(exchange, Exchange.XSLT_FILE_NAME, String.class); - exchange.addOnCompletion(new XsltBuilderOnCompletion(fileName)); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new XsltBuilderOnCompletion(fileName)); } Transformer transformer = getTransformer(); diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java index 04ae295..d20eec1 100644 --- a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java +++ b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java @@ -31,6 +31,7 @@ import java.util.Map; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.WrappedFile; import org.apache.camel.component.file.FileConsumer; import org.apache.camel.component.file.GenericFile; @@ -162,7 +163,7 @@ public class ZipAggregationStrategy implements AggregationStrategy { throw new GenericFileOperationFailedException(e.getMessage(), e); } answer = newExchange; - answer.addOnCompletion(new DeleteZipFileOnCompletion(zipFile)); + answer.adapt(ExtendedExchange.class).addOnCompletion(new DeleteZipFileOnCompletion(zipFile)); } else { zipFile = oldExchange.getIn().getBody(File.class); }