This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 43a6574cbd699733dd174f60dcc5a87c68068e21 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Wed Nov 23 13:51:12 2022 +0100 CAMEL-15105: rework handling addOnCompletions on Exchanges in several components --- .../org/apache/camel/component/aws2/s3/AWS2S3Consumer.java | 5 ++--- .../org/apache/camel/component/aws2/sqs/Sqs2Consumer.java | 5 ++--- .../camel/component/azure/cosmosdb/CosmosDbConsumer.java | 3 +-- .../camel/component/azure/eventhubs/EventHubsConsumer.java | 3 +-- .../component/azure/servicebus/ServiceBusConsumer.java | 3 +-- .../camel/component/azure/storage/blob/BlobConsumer.java | 3 +-- .../component/azure/storage/datalake/DataLakeConsumer.java | 3 +-- .../camel/component/azure/storage/queue/QueueConsumer.java | 3 +-- .../org/apache/camel/dataformat/csv/CsvUnmarshaller.java | 3 +-- .../cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java | 3 +-- .../camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java | 3 +-- .../apache/camel/component/cxf/jaxws/DefaultCxfBinding.java | 3 +-- .../cxf/jaxws/CxfConsumerClientDisconnectedTest.java | 3 +-- .../component/cxf/jaxws/CxfConsumerStreamCacheTest.java | 3 +-- .../component/disruptor/AbstractSynchronizedExchange.java | 5 ++--- .../apache/camel/component/disruptor/DisruptorConsumer.java | 2 +- .../apache/camel/component/disruptor/DisruptorProducer.java | 2 +- .../DisruptorInOutChainedWithOnCompletionTest.java | 3 +-- .../DisruptorWaitForTaskCompleteOnCompletionTest.java | 3 +-- .../DisruptorWaitForTaskNeverOnCompletionTest.java | 3 +-- .../apache/camel/component/file/GenericFileConsumer.java | 3 +-- .../camel/component/file/remote/RemoteFileConsumer.java | 3 +-- .../google/mail/stream/GoogleMailStreamConsumer.java | 3 +-- .../camel/component/google/pubsub/GooglePubsubConsumer.java | 3 +-- .../google/pubsub/consumer/CamelMessageReceiver.java | 3 +-- .../google/storage/GoogleCloudStorageConsumer.java | 3 +-- .../java/org/apache/camel/component/http/HttpProducer.java | 3 +-- .../apache/camel/component/huaweicloud/obs/OBSConsumer.java | 3 +-- .../camel/component/ignite/cache/IgniteCacheProducer.java | 3 +-- .../org/apache/camel/component/ironmq/IronMQConsumer.java | 3 +-- .../java/org/apache/camel/component/jdbc/JdbcProducer.java | 5 ++--- .../component/kamelet/KameletConsumerUoWIssueTest.java | 3 +-- .../apache/camel/component/kamelet/KameletUoWIssueTest.java | 3 +-- .../java/org/apache/camel/component/mail/MailConsumer.java | 3 +-- .../org/apache/camel/component/minio/MinioConsumer.java | 5 ++--- .../camel/component/netty/http/DefaultNettyHttpBinding.java | 4 ++-- .../camel/component/netty/http/NettyHttpProducer.java | 2 +- .../org/apache/camel/component/netty/NettyProducer.java | 3 +-- .../pg/replication/slot/PgReplicationSlotConsumer.java | 3 +-- .../streams/engine/DefaultCamelReactiveStreamsService.java | 3 +-- .../component/reactor/engine/ReactorStreamsService.java | 3 +-- .../camel/component/rxjava/engine/RxJavaStreamsService.java | 3 +-- .../java/org/apache/camel/component/seda/SedaProducer.java | 3 +-- .../java/org/apache/camel/component/sjms/SjmsTemplate.java | 13 ++++++++----- .../java/org/apache/camel/component/sql/SqlProducer.java | 3 +-- .../processor/aggregate/tarfile/TarAggregationStrategy.java | 2 +- .../java/org/apache/camel/component/xslt/XsltBuilder.java | 4 ++-- .../processor/aggregate/zipfile/ZipAggregationStrategy.java | 2 +- .../camel/component/seda/SedaDiscardIfNoConsumerTest.java | 3 +-- .../seda/SedaInOutChainedWithOnCompletionTest.java | 3 +-- .../seda/SedaWaitForTaskCompleteOnCompletionTest.java | 3 +-- .../seda/SedaWaitForTaskNewerOnCompletionTest.java | 3 +-- 52 files changed, 66 insertions(+), 107 deletions(-) diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java index b19c85b5abf..ce801418763 100644 --- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java +++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java @@ -28,7 +28,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -283,7 +282,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { pendingExchanges = total - index - 1; // add on completion to handle after work when the exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { public void onComplete(Exchange exchange) { processCommit(exchange); } @@ -437,7 +436,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { IOHelper.close(s3Object); } else { if (getConfiguration().isAutocloseBody()) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { IOHelper.close(s3Object); diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index f9e5e0627a8..a945c838e4b 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -31,7 +31,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.health.HealthCheckHelper; @@ -188,7 +187,7 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { final ScheduledFuture<?> scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate( new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { cancelExtender(exchange); @@ -209,7 +208,7 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { } // add on completion to handle after work when the exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { processCommit(exchange); diff --git a/components/camel-azure/camel-azure-cosmosdb/src/main/java/org/apache/camel/component/azure/cosmosdb/CosmosDbConsumer.java b/components/camel-azure/camel-azure-cosmosdb/src/main/java/org/apache/camel/component/azure/cosmosdb/CosmosDbConsumer.java index b252ca5d537..91e67820f67 100644 --- a/components/camel-azure/camel-azure-cosmosdb/src/main/java/org/apache/camel/component/azure/cosmosdb/CosmosDbConsumer.java +++ b/components/camel-azure/camel-azure-cosmosdb/src/main/java/org/apache/camel/component/azure/cosmosdb/CosmosDbConsumer.java @@ -22,7 +22,6 @@ import java.util.Map; import com.azure.cosmos.ChangeFeedProcessor; import com.azure.cosmos.implementation.apachecommons.lang.RandomStringUtils; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.azure.cosmosdb.client.CosmosAsyncClientWrapper; @@ -87,7 +86,7 @@ public class CosmosDbConsumer extends DefaultConsumer { final Exchange exchange = createAzureCosmosDbExchange(record); // add exchange callback - exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + exchange.getExchangeExtension().addOnCompletion(onCompletion); // use default consumer callback getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); } diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java index e9f43681954..89b9a71b518 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java +++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java @@ -24,7 +24,6 @@ import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory; @@ -127,7 +126,7 @@ public class EventHubsConsumer extends DefaultConsumer { final Exchange exchange = createAzureEventHubExchange(eventContext); // add exchange callback - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { // we update the consumer offsets diff --git a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java index fcf1f5c6385..2719a0187b9 100644 --- a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java +++ b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java @@ -23,7 +23,6 @@ import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -141,7 +140,7 @@ public class ServiceBusConsumer extends DefaultConsumer { final Exchange exchange = createServiceBusExchange(message); // add exchange callback - exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + exchange.getExchangeExtension().addOnCompletion(onCompletion); // use default consumer callback AsyncCallback cb = defaultConsumerCallback(exchange, true); getAsyncProcessor().process(exchange, cb); diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java index 1cf39cd1593..be69d8b7f1e 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java @@ -27,7 +27,6 @@ import com.azure.storage.blob.models.BlobStorageException; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.component.azure.storage.blob.client.BlobClientWrapper; import org.apache.camel.component.azure.storage.blob.client.BlobContainerClientWrapper; @@ -139,7 +138,7 @@ public class BlobConsumer extends ScheduledBatchPollingConsumer { pendingExchanges = total - index - 1; // add on completion to handle after work when the exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { LOG.trace("Completed from processing all exchanges..."); diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java index 7fc5981b384..f288230e078 100644 --- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java +++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java @@ -28,7 +28,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.component.azure.storage.datalake.client.DataLakeFileClientWrapper; import org.apache.camel.component.azure.storage.datalake.client.DataLakeFileSystemClientWrapper; @@ -138,7 +137,7 @@ public class DataLakeConsumer extends ScheduledBatchPollingConsumer { pendingExchanges = total - i - 1; - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { LOG.trace("Processing all exchanges completed"); diff --git a/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java b/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java index d48ab826327..85441a97c39 100644 --- a/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java +++ b/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java @@ -29,7 +29,6 @@ import com.azure.storage.queue.models.QueueStorageException; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.azure.storage.queue.client.QueueClientWrapper; @@ -129,7 +128,7 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer { final Duration timeout = exchange.getIn().getHeader(QueueConstants.TIMEOUT, Duration.class); // add on completion to handle after work when the exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { // past messageId, popReceipt, timeout for fix exchange override case diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java index 945d4d2788a..a76039c7044 100644 --- a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java +++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvUnmarshaller.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.List; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.util.IOHelper; import org.apache.commons.csv.CSVFormat; @@ -140,7 +139,7 @@ abstract class CsvUnmarshaller { CSVParser parser = new CSVParser(reader, format); CsvIterator answer = new CsvIterator(parser, converter); // add to UoW so we can close the iterator so it can release any resources - exchange.adapt(ExtendedExchange.class).addOnCompletion(new CsvUnmarshalOnCompletion(answer)); + exchange.getExchangeExtension().addOnCompletion(new CsvUnmarshalOnCompletion(answer)); return answer; } catch (Exception e) { IOHelper.close(reader); diff --git a/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java b/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java index d41ef5e2ac5..84d35ed86a8 100644 --- a/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java +++ b/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsConsumerClientDisconnectedTest.java @@ -20,7 +20,6 @@ import java.io.BufferedWriter; import java.io.OutputStreamWriter; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.cxf.common.CXFTestSupport; import org.apache.camel.component.mock.MockEndpoint; @@ -56,7 +55,7 @@ public class CxfRsConsumerClientDisconnectedTest extends CamelTestSupport { .process(exchange -> { Thread.sleep(100); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { template.sendBody("mock:onComplete", ""); diff --git a/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java b/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java index fef5033eff3..a7829a3af0f 100644 --- a/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java +++ b/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsStreamCacheTest.java @@ -17,7 +17,6 @@ package org.apache.camel.component.cxf.jaxrs; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.cxf.common.CXFTestSupport; import org.apache.camel.component.cxf.jaxrs.testbean.Customer; @@ -65,7 +64,7 @@ public class CxfRsStreamCacheTest extends CamelTestSupport { cos.close(); exchange.getMessage().setBody(cos.newStreamCache()); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { template.sendBody("mock:onComplete", ""); diff --git a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/DefaultCxfBinding.java b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/DefaultCxfBinding.java index 768331285b2..a2c234e1989 100644 --- a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/DefaultCxfBinding.java +++ b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/DefaultCxfBinding.java @@ -51,7 +51,6 @@ import org.w3c.dom.Node; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.attachment.AttachmentMessage; import org.apache.camel.attachment.DefaultAttachment; import org.apache.camel.component.cxf.common.CxfBinding; @@ -223,7 +222,7 @@ public class DefaultCxfBinding implements CxfBinding, HeaderFilterStrategyAware * @param cxfExchange */ private void addAttachmentFileCloseUoW(Exchange camelExchange, org.apache.cxf.message.Exchange cxfExchange) { - camelExchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + camelExchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(org.apache.camel.Exchange exchange) { Collection<Attachment> atts = cxfExchange.getInMessage().getAttachments(); diff --git a/components/camel-cxf/camel-cxf-soap/src/test/java/org/apache/camel/component/cxf/jaxws/CxfConsumerClientDisconnectedTest.java b/components/camel-cxf/camel-cxf-soap/src/test/java/org/apache/camel/component/cxf/jaxws/CxfConsumerClientDisconnectedTest.java index fd3e68f48da..b2d9a58cdb9 100644 --- a/components/camel-cxf/camel-cxf-soap/src/test/java/org/apache/camel/component/cxf/jaxws/CxfConsumerClientDisconnectedTest.java +++ b/components/camel-cxf/camel-cxf-soap/src/test/java/org/apache/camel/component/cxf/jaxws/CxfConsumerClientDisconnectedTest.java @@ -20,7 +20,6 @@ import java.io.BufferedWriter; import java.io.OutputStreamWriter; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.cxf.common.CXFTestSupport; import org.apache.camel.component.mock.MockEndpoint; @@ -56,7 +55,7 @@ public class CxfConsumerClientDisconnectedTest extends CamelTestSupport { .process(exchange -> { Thread.sleep(100); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { template.sendBody("mock:onComplete", ""); diff --git a/components/camel-cxf/camel-cxf-soap/src/test/java/org/apache/camel/component/cxf/jaxws/CxfConsumerStreamCacheTest.java b/components/camel-cxf/camel-cxf-soap/src/test/java/org/apache/camel/component/cxf/jaxws/CxfConsumerStreamCacheTest.java index 2a164a6859d..a13988a03cb 100644 --- a/components/camel-cxf/camel-cxf-soap/src/test/java/org/apache/camel/component/cxf/jaxws/CxfConsumerStreamCacheTest.java +++ b/components/camel-cxf/camel-cxf-soap/src/test/java/org/apache/camel/component/cxf/jaxws/CxfConsumerStreamCacheTest.java @@ -19,7 +19,6 @@ package org.apache.camel.component.cxf.jaxws; import org.w3c.dom.Node; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; @@ -73,7 +72,7 @@ public class CxfConsumerStreamCacheTest extends CamelTestSupport { cos.close(); exchange.getMessage().setBody(cos.newStreamCache()); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { template.sendBody("mock:onComplete", ""); diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java index 3709e4193d3..0018aca72e2 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/AbstractSynchronizedExchange.java @@ -19,7 +19,6 @@ package org.apache.camel.component.disruptor; import java.util.List; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.UnitOfWorkHelper; import org.slf4j.Logger; @@ -32,7 +31,7 @@ public abstract class AbstractSynchronizedExchange implements SynchronizedExchan public AbstractSynchronizedExchange(Exchange exchange) { this.exchange = exchange; - synchronizations = exchange.adapt(ExtendedExchange.class).handoverCompletions(); + synchronizations = exchange.getExchangeExtension().handoverCompletions(); } @Override @@ -44,7 +43,7 @@ public abstract class AbstractSynchronizedExchange implements SynchronizedExchan public Exchange cancelAndGetOriginalExchange() { if (synchronizations != null) { for (Synchronization synchronization : synchronizations) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(synchronization); + exchange.getExchangeExtension().addOnCompletion(synchronization); } } diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java index b9734afee15..cee5b483814 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java @@ -164,7 +164,7 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe // (see org.apache.camel.processor.CamelInternalProcessor.InternalCallback#done). // To solve this problem, a new synchronization is set on the exchange that is to be // processed - result.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + result.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { synchronizedExchange.consumed(result); diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java index 529fe901b30..a35ba77cf9b 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java @@ -86,7 +86,7 @@ public class DisruptorProducer extends DefaultAsyncProducer { final CountDownLatch latch = new CountDownLatch(1); // we should wait for the reply so install a on completion so we know when its complete - copy.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + copy.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(final Exchange response) { // check for timeout, which then already would have invoked the latch diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java index a7eab138ca0..a0f1f89a202 100644 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java +++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java @@ -17,7 +17,6 @@ package org.apache.camel.component.disruptor; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -50,7 +49,7 @@ public class DisruptorInOutChainedWithOnCompletionTest extends CamelTestSupport @Override public void process(final Exchange exchange) { // should come in last - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(final Exchange exchange) { template.sendBody("mock:c", "onCustomCompletion"); diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java index 8b8b1add1a3..97b57fdf272 100644 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java +++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java @@ -18,7 +18,6 @@ package org.apache.camel.component.disruptor; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -62,7 +61,7 @@ public class DisruptorWaitForTaskCompleteOnCompletionTest extends CamelTestSuppo from("direct:start").process(new Processor() { @Override public void process(final Exchange exchange) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(final Exchange exchange) { done += "A"; diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java index 53e0e51a71c..934eada31b6 100644 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java +++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java @@ -20,7 +20,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -61,7 +60,7 @@ public class DisruptorWaitForTaskNeverOnCompletionTest extends CamelTestSupport from("direct:start").process(new Processor() { @Override public void process(final Exchange exchange) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(final Exchange exchange) { done = done + "A"; diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 1b7b8adf562..ef3c28a2d71 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -27,7 +27,6 @@ import java.util.regex.Pattern; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -475,7 +474,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // register on completion callback that does the completion // strategies // (for instance to move the file after we have processed it) - exchange.adapt(ExtendedExchange.class).addOnCompletion( + exchange.getExchangeExtension().addOnCompletion( new GenericFileOnCompletion<>(endpoint, operations, processStrategy, target, absoluteFileName)); LOG.debug("About to process file: {} using exchange: {}", target, exchange); diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java index 9b1486256a8..a829fd5c3e8 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java @@ -20,7 +20,6 @@ import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Ordered; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; @@ -122,7 +121,7 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { // from the batch should do that boolean isLast = exchange.getProperty(ExchangePropertyKey.BATCH_COMPLETE, true, Boolean.class); if (isLast && getEndpoint().isDisconnect()) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { LOG.trace("processExchange disconnect from: {}", getEndpoint()); diff --git a/components/camel-google/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java b/components/camel-google/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java index 425189d1bd9..d65e4ea86be 100644 --- a/components/camel-google/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java +++ b/components/camel-google/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java @@ -33,7 +33,6 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.EmptyAsyncCallback; @@ -114,7 +113,7 @@ public class GoogleMailStreamConsumer extends ScheduledBatchPollingConsumer { pendingExchanges = total - index - 1; // add on completion to handle after work when the exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { public void onComplete(Exchange exchange) { processCommit(exchange, unreadLabelId); } diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index ef3dc08a443..1d9d5012360 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -39,7 +39,6 @@ import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync; import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver; @@ -195,7 +194,7 @@ public class GooglePubsubConsumer extends DefaultConsumer { //existing subscriber can not be propagated, because it will be closed at the end of this block //subscriber will be created at the moment of use // (see https://issues.apache.org/jira/browse/CAMEL-18447) - exchange.adapt(ExtendedExchange.class) + exchange.getExchangeExtension() .addOnCompletion(new AcknowledgeSync( () -> endpoint.getComponent().getSubscriberStub(endpoint), subscriptionName)); } diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java index d26d649af7a..470d7396d80 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java @@ -21,7 +21,6 @@ import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.common.base.Strings; import com.google.pubsub.v1.PubsubMessage; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.component.google.pubsub.GooglePubsubConstants; import org.apache.camel.component.google.pubsub.GooglePubsubConsumer; @@ -64,7 +63,7 @@ public class CamelMessageReceiver implements MessageReceiver { } if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new AcknowledgeAsync(ackReplyConsumer)); + exchange.getExchangeExtension().addOnCompletion(new AcknowledgeAsync(ackReplyConsumer)); } try { diff --git a/components/camel-google/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java b/components/camel-google/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java index 6f839702b55..7b488098706 100644 --- a/components/camel-google/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java +++ b/components/camel-google/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java @@ -33,7 +33,6 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Expression; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -198,7 +197,7 @@ public class GoogleCloudStorageConsumer extends ScheduledBatchPollingConsumer { pendingExchanges = total - index - 1; // add on completion to handle after work when the exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { public void onComplete(Exchange exchange) { processCommit(exchange); } diff --git a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java index 04aa2d8191d..cad827fc62d 100644 --- a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java +++ b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java @@ -37,7 +37,6 @@ import java.util.Map.Entry; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.TypeConverter; import org.apache.camel.component.file.GenericFile; @@ -298,7 +297,7 @@ public class HttpProducer extends DefaultProducer { final HttpResponse response = httpResponse; if (httpResponse != null && getEndpoint().isDisableStreamCache()) { // close the stream at the end of the exchange to ensure it gets eventually closed later - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { try { diff --git a/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSConsumer.java b/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSConsumer.java index a0f3d8ca44e..89412cdb101 100644 --- a/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSConsumer.java +++ b/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSConsumer.java @@ -31,7 +31,6 @@ import com.obs.services.model.ObsObject; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.component.huaweicloud.obs.constants.OBSHeaders; import org.apache.camel.spi.Synchronization; @@ -167,7 +166,7 @@ public class OBSConsumer extends ScheduledBatchPollingConsumer { // update number of pending exchanges pendingExchanges = total - index - 1; - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { processComplete(exchange); diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java index 59422e6e679..ebcb4f5552b 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java @@ -22,7 +22,6 @@ import java.util.Set; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; @@ -144,7 +143,7 @@ public class IgniteCacheProducer extends DefaultAsyncProducer { out.setBody(cursor.iterator()); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onFailure(Exchange exchange) { cursor.close(); diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java index 21a21cc9eca..6bad0e60fe5 100644 --- a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java @@ -25,7 +25,6 @@ import io.iron.ironmq.Messages; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.spi.ScheduledPollConsumerScheduler; import org.apache.camel.spi.Synchronization; @@ -122,7 +121,7 @@ public class IronMQConsumer extends ScheduledBatchPollingConsumer { // add on completion to handle after work when the exchange is done // if batchDelete is not enabled if (!getEndpoint().getConfiguration().isBatchDelete()) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { final String reservationId = ExchangeHelper.getMandatoryHeader(exchange, IronMQConstants.MESSAGE_RESERVATION_ID, String.class); final String messageid diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java index a774d6249f2..03edc8fb1ab 100644 --- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java +++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java @@ -31,7 +31,6 @@ import java.util.Map; import javax.sql.DataSource; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultProducer; import org.apache.camel.support.PropertyBindingSupport; @@ -203,7 +202,7 @@ public class JdbcProducer extends DefaultProducer { // are not using try-with-resources here. final Statement stmt = conn.createStatement(); // ensure statement is closed (to not leak) when exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { closeQuietly(stmt); @@ -346,7 +345,7 @@ public class JdbcProducer extends DefaultProducer { .setBody(new StreamListIterator( getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), iterator)); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new ResultSetIteratorCompletion(iterator)); + exchange.getExchangeExtension().addOnCompletion(new ResultSetIteratorCompletion(iterator)); // do not close resources as we are in streaming mode answer = false; } else if (outputType == JdbcOutputType.SelectList) { diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumerUoWIssueTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumerUoWIssueTest.java index 9494eaa06cb..6eff0c9d778 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumerUoWIssueTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumerUoWIssueTest.java @@ -17,7 +17,6 @@ package org.apache.camel.component.kamelet; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; @@ -54,7 +53,7 @@ public class KameletConsumerUoWIssueTest extends CamelTestSupport { .process(new Processor() { @Override public void process(Exchange exchange) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { super.onDone(exchange); diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletUoWIssueTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletUoWIssueTest.java index 919fa226d4d..eda294564d4 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletUoWIssueTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletUoWIssueTest.java @@ -17,7 +17,6 @@ package org.apache.camel.component.kamelet; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; @@ -54,7 +53,7 @@ public class KameletUoWIssueTest extends CamelTestSupport { .process(new Processor() { @Override public void process(Exchange exchange) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { super.onDone(exchange); diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java index ffde63a2068..3304f10b1c3 100644 --- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java +++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java @@ -39,7 +39,6 @@ import com.sun.mail.imap.SortTerm; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.ExtendedCamelContext; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.attachment.Attachment; @@ -235,7 +234,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer { final Message mail = exchange.getIn(MailMessage.class).getOriginalMessage(); // add on completion to handle after work when the exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { processCommit(mail, exchange); diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java index f693a53a79f..49647d77814 100644 --- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java +++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java @@ -38,7 +38,6 @@ import io.minio.errors.MinioException; import io.minio.messages.Item; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; @@ -272,7 +271,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { minioObject = getObject(srcBucketName, getMinioClient(), srcObjectName); exchange.getIn().setBody(IOUtils.toByteArray(minioObject)); if (getConfiguration().isAutoCloseBody()) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { IOHelper.close(minioObject); @@ -286,7 +285,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer { } // add on completion to handle after work when the exchange is done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { public void onComplete(Exchange exchange) { processCommit(exchange); } diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java index e3750efb9d9..b1e949593ab 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java @@ -105,7 +105,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { // we need to retain it so that the request can be released and we can keep the content answer.setBody(request.content().retain()); answer.getExchange().adapt(ExtendedExchange.class).setStreamCacheDisabled(true); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { ReferenceCountUtil.release(request.content()); @@ -115,7 +115,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable { // turn the body into stream cached (on the client/consumer side we can facade the netty stream instead of converting to byte array) NettyChannelBufferStreamCache cache = new NettyChannelBufferStreamCache(request.content()); // add on completion to the cache which is needed for Camel to keep track of the lifecycle of the cache - exchange.adapt(ExtendedExchange.class).addOnCompletion(new NettyChannelBufferStreamCacheOnCompletion(cache)); + exchange.getExchangeExtension().addOnCompletion(new NettyChannelBufferStreamCacheOnCompletion(cache)); answer.setBody(cache); } return answer; diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java index 0baa9b67b36..e615b624cd9 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpProducer.java @@ -148,7 +148,7 @@ public class NettyHttpProducer extends NettyProducer { response.content().retain(); // need to release the response when we are done - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { if (response.refCnt() > 0) { diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index 0683f07ad17..71a4a23bb5a 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -47,7 +47,6 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.spi.CamelLogger; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.ExchangeHelper; @@ -291,7 +290,7 @@ public class NettyProducer extends DefaultAsyncProducer { channel.attr(CORRELATION_MANAGER_ATTR).set(correlationManager); exchange.setProperty(NettyConstants.NETTY_CHANNEL, channel); // and defer closing the channel until we are done routing the exchange - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onComplete(Exchange exchange) { // should channel be closed after complete? diff --git a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java index ad278b0e62c..28b11762148 100644 --- a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java +++ b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java @@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.spi.Synchronization; @@ -141,7 +140,7 @@ public class PgReplicationSlotConsumer extends ScheduledPollConsumer { } }, delay, delay, TimeUnit.SECONDS); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { processCommit(exchange); diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java index f5a15b2ad12..c77c84e2d5c 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java @@ -35,7 +35,6 @@ import javax.management.openmbean.TabularType; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; @@ -175,7 +174,7 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool); - data.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + data.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { publisher.setData(exchange); diff --git a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java index dbc7f8ffe11..407dddcf820 100644 --- a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java +++ b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java @@ -23,7 +23,6 @@ import java.util.function.Supplier; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; @@ -316,7 +315,7 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv } return Mono.<Exchange> create( - sink -> data.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + sink -> data.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { sink.success(exchange); diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java index bb3623ab313..7dcf1a5df23 100644 --- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java +++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java @@ -25,7 +25,6 @@ import io.reactivex.Flowable; import io.reactivex.Single; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; @@ -304,7 +303,7 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive } Single<Exchange> source = Single.<Exchange> create( - emitter -> data.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { + emitter -> data.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { emitter.onSuccess(exchange); diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java index c7a65142957..f75b47b7c09 100644 --- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java +++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaProducer.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; -import org.apache.camel.ExtendedExchange; import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.ExchangeHelper; @@ -69,7 +68,7 @@ public class SedaProducer extends DefaultAsyncProducer { final CountDownLatch latch = new CountDownLatch(1); // we should wait for the reply so install a on completion so we know when its complete - copy.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + copy.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange response) { // check for timeout, which then already would have invoked the latch diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsTemplate.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsTemplate.java index e56211edc9b..12bdc59b822 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsTemplate.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsTemplate.java @@ -26,14 +26,18 @@ import jakarta.jms.MessageProducer; import jakarta.jms.Session; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.component.sjms.jms.DestinationCreationStrategy; import org.apache.camel.component.sjms.jms.JmsConstants; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.component.sjms.jms.MessageCreator; import org.apache.camel.util.ObjectHelper; -import static org.apache.camel.component.sjms.SjmsHelper.*; +import static org.apache.camel.component.sjms.SjmsHelper.closeConnection; +import static org.apache.camel.component.sjms.SjmsHelper.closeConsumer; +import static org.apache.camel.component.sjms.SjmsHelper.closeProducer; +import static org.apache.camel.component.sjms.SjmsHelper.closeSession; +import static org.apache.camel.component.sjms.SjmsHelper.commitIfNeeded; +import static org.apache.camel.component.sjms.SjmsHelper.isTransactionOrClientAcknowledgeMode; public class SjmsTemplate { @@ -148,10 +152,9 @@ public class SjmsTemplate { try { if (transacted) { // defer closing till end of UoW - ExtendedExchange ecc = exchange.adapt(ExtendedExchange.class); TransactionOnCompletion toc = new TransactionOnCompletion(session, this.message); - if (!ecc.containsOnCompletion(toc)) { - ecc.addOnCompletion(toc); + if (!exchange.getExchangeExtension().containsOnCompletion(toc)) { + exchange.getExchangeExtension().addOnCompletion(toc); } } else { closeSession(session); diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java index 7af835a77fb..1852c8f4cbf 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.List; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.support.DefaultProducer; import org.apache.camel.support.ResourceHelper; import org.slf4j.Logger; @@ -231,7 +230,7 @@ public class SqlProducer extends DefaultProducer { // we do not know the row count so we cannot set a ROW_COUNT header // defer closing the iterator when the exchange is complete - exchange.adapt(ExtendedExchange.class).addOnCompletion(new ResultSetIteratorCompletion(iterator)); + exchange.getExchangeExtension().addOnCompletion(new ResultSetIteratorCompletion(iterator)); } return iterator; } catch (Exception e) { diff --git a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java index 76710b322dd..0699f05b227 100644 --- a/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java +++ b/components/camel-tarfile/src/main/java/org/apache/camel/processor/aggregate/tarfile/TarAggregationStrategy.java @@ -153,7 +153,7 @@ public class TarAggregationStrategy implements AggregationStrategy { throw new GenericFileOperationFailedException(e.getMessage(), e); } answer = newExchange; - answer.adapt(ExtendedExchange.class).addOnCompletion(new DeleteTarFileOnCompletion(tarFile)); + answer.getExchangeExtension().addOnCompletion(new DeleteTarFileOnCompletion(tarFile)); } else { tarFile = oldExchange.getIn().getBody(File.class); } diff --git a/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java b/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java index 0fc6904e955..dbe4c8f669f 100644 --- a/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java +++ b/components/camel-xslt/src/main/java/org/apache/camel/component/xslt/XsltBuilder.java @@ -40,7 +40,6 @@ import javax.xml.transform.stream.StreamSource; import org.xml.sax.EntityResolver; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.support.ExchangeHelper; @@ -51,6 +50,7 @@ import org.apache.camel.util.IOHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static org.apache.camel.util.ObjectHelper.notNull; /** @@ -96,7 +96,7 @@ public class XsltBuilder implements Processor { if (isDeleteOutputFile()) { // add on completion so we can delete the file when the Exchange is done String fileName = ExchangeHelper.getMandatoryHeader(exchange, XsltConstants.XSLT_FILE_NAME, String.class); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new XsltBuilderOnCompletion(fileName)); + exchange.getExchangeExtension().addOnCompletion(new XsltBuilderOnCompletion(fileName)); } Transformer transformer = getTransformer(); diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java index 9d8d3d3c54d..b13df829839 100644 --- a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java +++ b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java @@ -170,7 +170,7 @@ public class ZipAggregationStrategy implements AggregationStrategy { throw new GenericFileOperationFailedException(e.getMessage(), e); } answer = newExchange; - answer.adapt(ExtendedExchange.class).addOnCompletion(new DeleteZipFileOnCompletion(zipFile)); + answer.getExchangeExtension().addOnCompletion(new DeleteZipFileOnCompletion(zipFile)); } else { zipFile = oldExchange.getIn().getBody(File.class); } diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java index 709e1eec559..07dd34a5a21 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java @@ -18,7 +18,6 @@ package org.apache.camel.component.seda; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -56,7 +55,7 @@ public class SedaDiscardIfNoConsumerTest extends ContextTestSupport { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("Hello World"); - exchange.adapt(ExtendedExchange.class).addOnCompletion(myCompletion); + exchange.getExchangeExtension().addOnCompletion(myCompletion); } }); diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java index f420c82a030..62158d5e6a0 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java @@ -18,7 +18,6 @@ package org.apache.camel.component.seda; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -50,7 +49,7 @@ public class SedaInOutChainedWithOnCompletionTest extends ContextTestSupport { from("seda:a").process(new Processor() { public void process(Exchange exchange) throws Exception { // should come in last - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:c", "onCustomCompletion"); diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteOnCompletionTest.java index 2f15f4c0287..c12891081bb 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteOnCompletionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteOnCompletionTest.java @@ -19,7 +19,6 @@ package org.apache.camel.component.seda; import org.apache.camel.CamelExecutionException; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -60,7 +59,7 @@ public class SedaWaitForTaskCompleteOnCompletionTest extends ContextTestSupport from("direct:start").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { done = done + "A"; diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerOnCompletionTest.java index af99379528b..13dc1b15bd3 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerOnCompletionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerOnCompletionTest.java @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -59,7 +58,7 @@ public class SedaWaitForTaskNewerOnCompletionTest extends ContextTestSupport { from("direct:start").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { done = done + "A";