This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 04ac187cf48b8998f91028d83631966425735308 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Wed Nov 23 13:48:29 2022 +0100 CAMEL-15105: rework handling addOnCompletions on Exchanges in camel-core module --- .../apache/camel/impl/engine/CamelInternalProcessor.java | 4 ++-- .../org/apache/camel/impl/engine/DefaultUnitOfWork.java | 2 +- .../camel/processor/aggregate/AggregateProcessor.java | 2 +- .../camel/processor/idempotent/IdempotentConsumer.java | 3 +-- .../apache/camel/processor/resume/ResumableProcessor.java | 3 +-- .../camel/impl/engine/CamelPostProcessorHelperTest.java | 5 ++--- .../java/org/apache/camel/issues/GertJBIIssueTest.java | 7 ++++--- .../apache/camel/processor/EnrichWithUnitOfWorkTest.java | 5 ++--- .../camel/processor/MDCOnCompletionOnCompletionTest.java | 3 +-- .../apache/camel/processor/OnCompletionContainsTest.java | 15 +++++++-------- .../camel/processor/OnCompletionShouldBeLastTest.java | 7 +++---- .../camel/processor/RouteAwareSynchronizationTest.java | 3 +-- .../camel/processor/async/AsyncEndpointDelayUoWTest.java | 7 ++++--- .../camel/processor/async/AsyncEndpointUoWFailedTest.java | 8 +++++--- .../camel/processor/async/AsyncEndpointUoWTest.java | 7 ++++--- .../camel/converter/stream/FileInputStreamCache.java | 3 +-- 16 files changed, 40 insertions(+), 44 deletions(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 74f5d9fbf25..29f97a3840c 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -602,7 +602,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In messageAsXml, messageAsJSon); backlogTracer.traceEvent(pseudoFirst); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { // create pseudo last @@ -1076,7 +1076,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In boolean contains = exchange.getUnitOfWork().containsSynchronization(tracingAfterRoute); if (!contains) { tracer.traceBeforeRoute(routeDefinition, exchange); - exchange.adapt(ExtendedExchange.class).addOnCompletion(tracingAfterRoute); + exchange.getExchangeExtension().addOnCompletion(tracingAfterRoute); } } tracer.traceBeforeNode(processorDefinition, exchange); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index ea1640933a6..5ddfd782329 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -212,7 +212,7 @@ public class DefaultUnitOfWork implements UnitOfWork { if (handover && (filter == null || filter.test(synchronization))) { log.trace("Handover synchronization {} to: {}", synchronization, target); - target.adapt(ExtendedExchange.class).addOnCompletion(synchronization); + target.getExchangeExtension().addOnCompletion(synchronization); // Allow the synchronization to do housekeeping before transfer if (veto != null) { veto.beforeHandover(target); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 306fbdb8263..b7a4e9a7a6d 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -876,7 +876,7 @@ public class AggregateProcessor extends AsyncProcessorSupport LOG.debug("Processing aggregated exchange: {}", exchange); // add on completion task so we remember to update the inProgressCompleteExchanges - exchange.adapt(ExtendedExchange.class).addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); + exchange.getExchangeExtension().addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); // send this exchange executorService.execute(() -> { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java index a425c4f341d..423c9157ccc 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java @@ -27,7 +27,6 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Expression; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.spi.IdAware; @@ -163,7 +162,7 @@ public class IdempotentConsumer extends AsyncProcessorSupport // we can use existing callback as target target = callback; // the scope is to do the idempotent completion work as an unit of work on the exchange when its done being routed - exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + exchange.getExchangeExtension().addOnCompletion(onCompletion); } } catch (Exception e) { exchange.setException(e); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java index a41ee86d96d..553a1bb9150 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java @@ -25,7 +25,6 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.LoggingLevel; import org.apache.camel.Navigate; import org.apache.camel.Processor; @@ -81,7 +80,7 @@ public class ResumableProcessor extends AsyncProcessorSupport public boolean process(final Exchange exchange, final AsyncCallback callback) { final Synchronization onCompletion = new ResumableCompletion(resumeStrategy, loggingLevel, intermittent); - exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + exchange.getExchangeExtension().addOnCompletion(onCompletion); return processor.process(exchange, callback); } diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java index 1a69e6a4f43..0781641f881 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java @@ -28,7 +28,6 @@ import org.apache.camel.Consume; import org.apache.camel.ContextTestSupport; import org.apache.camel.EndpointInject; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.FluentProducerTemplate; import org.apache.camel.NoSuchBeanException; import org.apache.camel.NoSuchEndpointException; @@ -566,7 +565,7 @@ public class CamelPostProcessorHelperTest extends ContextTestSupport { @Consume("seda:foo") public void consumeSomething(String body, Exchange exchange) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(mySynchronization); + exchange.getExchangeExtension().addOnCompletion(mySynchronization); assertEquals("Hello World", body); template.sendBody("mock:result", body); } @@ -581,7 +580,7 @@ public class CamelPostProcessorHelperTest extends ContextTestSupport { assertEquals("Hello World", body); Exchange exchange = producer.getEndpoint().createExchange(); - exchange.adapt(ExtendedExchange.class).addOnCompletion(mySynchronization); + exchange.getExchangeExtension().addOnCompletion(mySynchronization); exchange.getIn().setBody(body); producer.process(exchange); } diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java index c5a53c845a1..1c3f4905e02 100644 --- a/core/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java @@ -21,13 +21,14 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class GertJBIIssueTest extends ContextTestSupport { @@ -77,7 +78,7 @@ public class GertJBIIssueTest extends ContextTestSupport { template.send("direct:start", new Processor() { public void process(Exchange exchange) throws Exception { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { cause = exchange.getException(); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichWithUnitOfWorkTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichWithUnitOfWorkTest.java index d6ce342c330..620c4a4d39f 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichWithUnitOfWorkTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichWithUnitOfWorkTest.java @@ -18,7 +18,6 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -50,7 +49,7 @@ public class EnrichWithUnitOfWorkTest extends ContextTestSupport { @Override public void process(Exchange exchange) throws Exception { exchange.getMessage().setBody("Hello World"); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { exchange.getMessage().setBody("Done " + exchange.getMessage().getBody()); @@ -80,7 +79,7 @@ public class EnrichWithUnitOfWorkTest extends ContextTestSupport { @Override public void process(Exchange exchange) throws Exception { exchange.getMessage().setBody("Hello World"); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { exchange.getMessage().setBody("Done " + exchange.getMessage().getBody()); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionOnCompletionTest.java index e1b7fc317a4..09f1afd64b2 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionOnCompletionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionOnCompletionTest.java @@ -18,7 +18,6 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -54,7 +53,7 @@ public class MDCOnCompletionOnCompletionTest extends ContextTestSupport { .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new MyOnCompletion()); + exchange.getExchangeExtension().addOnCompletion(new MyOnCompletion()); } }).end().to("log:foo").to("direct:b"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java index 1fa79f1122a..664992a3bb4 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java @@ -18,7 +18,6 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -66,26 +65,26 @@ public class OnCompletionContainsTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { SynchronizationAdapter adapter = new SimpleSynchronizationAdapter("mock:sync", "A"); - exchange.adapt(ExtendedExchange.class).addOnCompletion(adapter); + exchange.getExchangeExtension().addOnCompletion(adapter); // should not add the adapter again as we already have // it - if (!exchange.adapt(ExtendedExchange.class).containsOnCompletion(adapter)) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(adapter); + if (!exchange.getExchangeExtension().containsOnCompletion(adapter)) { + exchange.getExchangeExtension().addOnCompletion(adapter); } adapter = new SimpleSynchronizationAdapter("mock:sync", "B"); - exchange.adapt(ExtendedExchange.class).addOnCompletion(adapter); + exchange.getExchangeExtension().addOnCompletion(adapter); // now add the B again as we want to test that this also // work - if (exchange.adapt(ExtendedExchange.class).containsOnCompletion(adapter)) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(adapter); + if (exchange.getExchangeExtension().containsOnCompletion(adapter)) { + exchange.getExchangeExtension().addOnCompletion(adapter); } // add a C that is no a SimpleSynchronizationAdapter // class - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:sync", "C"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionShouldBeLastTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionShouldBeLastTest.java index e55e6a3b32d..352d5dc2806 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionShouldBeLastTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionShouldBeLastTest.java @@ -18,7 +18,6 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -45,7 +44,7 @@ public class OnCompletionShouldBeLastTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:sync", "A"); @@ -57,7 +56,7 @@ public class OnCompletionShouldBeLastTest extends ContextTestSupport { } }); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:sync", "B"); @@ -69,7 +68,7 @@ public class OnCompletionShouldBeLastTest extends ContextTestSupport { } }); - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:sync", "C"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java index f553b22c986..cf76f57c53c 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.builder.RouteBuilder; @@ -46,7 +45,7 @@ public class RouteAwareSynchronizationTest extends ContextTestSupport { template.send("direct:start", new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new MyRouteAware()); + exchange.getExchangeExtension().addOnCompletion(new MyRouteAware()); exchange.getIn().setBody("Hello World"); } }); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java index 06450db0d98..d0a612c863e 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java @@ -20,13 +20,14 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class AsyncEndpointDelayUoWTest extends ContextTestSupport { @@ -61,7 +62,7 @@ public class AsyncEndpointDelayUoWTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { beforeThreadName = Thread.currentThread().getName(); - exchange.adapt(ExtendedExchange.class).addOnCompletion(sync); + exchange.getExchangeExtension().addOnCompletion(sync); } }).to("mock:before").to("log:before").delay(500).asyncDelayed().process(new Processor() { public void process(Exchange exchange) throws Exception { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java index 55987d1e7b9..1bfbe7e6e7d 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java @@ -21,13 +21,15 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.CamelExecutionException; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class AsyncEndpointUoWFailedTest extends ContextTestSupport { @@ -69,7 +71,7 @@ public class AsyncEndpointUoWFailedTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { beforeThreadName = Thread.currentThread().getName(); - exchange.adapt(ExtendedExchange.class).addOnCompletion(sync); + exchange.getExchangeExtension().addOnCompletion(sync); } }).to("mock:before").to("log:before").to("async:bye:camel").process(new Processor() { public void process(Exchange exchange) throws Exception { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java index 8c26a15ecb1..a7ed9463839 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java @@ -20,13 +20,14 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class AsyncEndpointUoWTest extends ContextTestSupport { @@ -63,7 +64,7 @@ public class AsyncEndpointUoWTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { beforeThreadName = Thread.currentThread().getName(); - exchange.adapt(ExtendedExchange.class).addOnCompletion(sync); + exchange.getExchangeExtension().addOnCompletion(sync); } }).to("mock:before").to("log:before").to("async:bye:camel").process(new Processor() { public void process(Exchange exchange) throws Exception { diff --git a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index da4ea64e87c..c44abd8f7a4 100644 --- a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -35,7 +35,6 @@ import javax.crypto.CipherOutputStream; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; -import org.apache.camel.ExtendedExchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.StreamCache; import org.apache.camel.spi.StreamCachingStrategy; @@ -278,7 +277,7 @@ public final class FileInputStreamCache extends InputStream implements StreamCac streamCacheUnitOfWork.addSynchronization(onCompletion); } else { // add on completion so we can cleanup after the exchange is done such as deleting temporary files - exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + exchange.getExchangeExtension().addOnCompletion(onCompletion); } } }