This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 207e7dfdd4d3890ccf4f394853f4f6a2a64ef06d Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Jan 16 14:59:38 2020 +0100 CAMEL-14409: camel-core - ExtendedExchange for advanced API --- .../src/main/java/org/apache/camel/Exchange.java | 60 ---------------------- .../java/org/apache/camel/ExtendedExchange.java | 58 +++++++++++++++++++++ .../camel/impl/engine/DefaultConsumerTemplate.java | 3 +- .../camel/impl/engine/DefaultUnitOfWork.java | 3 +- .../camel/processor/CamelInternalProcessor.java | 6 +-- .../java/org/apache/camel/processor/Enricher.java | 3 +- .../org/apache/camel/processor/PollEnricher.java | 3 +- .../java/org/apache/camel/processor/Splitter.java | 3 +- .../processor/aggregate/AggregateProcessor.java | 3 +- .../processor/idempotent/IdempotentConsumer.java | 3 +- .../apache/camel/builder/xml/XsltBuilderTest.java | 3 +- .../seda/SedaDiscardIfNoConsumerTest.java | 3 +- .../seda/SedaInOutChainedWithOnCompletionTest.java | 3 +- .../SedaWaitForTaskCompleteOnCompletionTest.java | 3 +- .../seda/SedaWaitForTaskNewerOnCompletionTest.java | 3 +- .../converter/stream/CachedOutputStreamTest.java | 3 +- .../impl/engine/CamelPostProcessorHelperTest.java | 5 +- .../org/apache/camel/issues/GertJBIIssueTest.java | 3 +- .../apache/camel/language/simple/SimpleTest.java | 3 +- .../processor/MDCOnCompletionOnCompletionTest.java | 3 +- .../camel/processor/OnCompletionContainsTest.java | 15 +++--- .../processor/OnCompletionShouldBeLastTest.java | 7 +-- .../processor/RouteAwareSynchronizationTest.java | 3 +- .../processor/async/AsyncEndpointDelayUoWTest.java | 3 +- .../async/AsyncEndpointUoWFailedTest.java | 3 +- .../processor/async/AsyncEndpointUoWTest.java | 3 +- .../converter/stream/FileInputStreamCache.java | 3 +- .../org/apache/camel/support/DefaultConsumer.java | 5 +- .../org/apache/camel/support/DefaultExchange.java | 3 +- .../org/apache/camel/support/ExchangeHelper.java | 5 +- .../org/apache/camel/support/UnitOfWorkHelper.java | 3 +- 31 files changed, 127 insertions(+), 103 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index 6a8fa26..49ab34e 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -554,43 +554,18 @@ public interface Exchange { Endpoint getFromEndpoint(); /** - * Sets the endpoint which originated this message exchange. This method - * should typically only be called by {@link org.apache.camel.Endpoint} implementations - * - * @param fromEndpoint the endpoint which is originating this message exchange - */ - // TODO: Move to ExtendedExchange - void setFromEndpoint(Endpoint fromEndpoint); - - /** * Returns the route id which originated this message exchange if a route consumer on an endpoint * created the message exchange, otherwise this property will be <tt>null</tt> */ String getFromRouteId(); /** - * Sets the route id which originated this message exchange. This method - * should typically only be called by the internal framework. - * - * @param fromRouteId the from route id - */ - // TODO: Move to ExtendedExchange - void setFromRouteId(String fromRouteId); - - /** * Returns the unit of work that this exchange belongs to; which may map to * zero, one or more physical transactions */ UnitOfWork getUnitOfWork(); /** - * Sets the unit of work that this exchange belongs to; which may map to - * zero, one or more physical transactions - */ - // TODO: Move to ExtendedExchange - void setUnitOfWork(UnitOfWork unitOfWork); - - /** * Returns the exchange id (unique) */ String getExchangeId(); @@ -601,41 +576,6 @@ public interface Exchange { void setExchangeId(String id); /** - * Adds a {@link org.apache.camel.spi.Synchronization} to be invoked as callback when - * this exchange is completed. - * - * @param onCompletion the callback to invoke on completion of this exchange - */ - // TODO: Move to ExtendedExchange - void addOnCompletion(Synchronization onCompletion); - - /** - * Checks if the passed {@link org.apache.camel.spi.Synchronization} instance is - * already contained on this exchange. - * - * @param onCompletion the callback instance that is being checked for - * @return <tt>true</tt>, if callback instance is already contained on this exchange, else <tt>false</tt> - */ - // TODO: Move to ExtendedExchange - boolean containsOnCompletion(Synchronization onCompletion); - - /** - * Handover all the on completions from this exchange to the target exchange. - * - * @param target the target exchange - */ - // TODO: Move to ExtendedExchange - void handoverCompletions(Exchange target); - - /** - * Handover all the on completions from this exchange - * - * @return the on completions - */ - // TODO: Move to ExtendedExchange - List<Synchronization> handoverCompletions(); - - /** * Gets the timestamp in millis when this exchange was created. */ long getCreated(); diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java index 718bdb0..2b07540 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java @@ -1,5 +1,10 @@ package org.apache.camel; +import java.util.List; + +import org.apache.camel.spi.Synchronization; +import org.apache.camel.spi.UnitOfWork; + /** * Extended {@link Exchange} which contains the methods and APIs that are not primary intended for Camel end users * but for SPI, custom components, or more advanced used-cases with Camel. @@ -7,6 +12,59 @@ package org.apache.camel; public interface ExtendedExchange extends Exchange { /** + * Sets the endpoint which originated this message exchange. This method + * should typically only be called by {@link Endpoint} implementations + * + * @param fromEndpoint the endpoint which is originating this message exchange + */ + void setFromEndpoint(Endpoint fromEndpoint); + + /** + * Sets the route id which originated this message exchange. This method + * should typically only be called by the internal framework. + * + * @param fromRouteId the from route id + */ + void setFromRouteId(String fromRouteId); + + /** + * Sets the unit of work that this exchange belongs to; which may map to + * zero, one or more physical transactions + */ + void setUnitOfWork(UnitOfWork unitOfWork); + + /** + * Adds a {@link org.apache.camel.spi.Synchronization} to be invoked as callback when + * this exchange is completed. + * + * @param onCompletion the callback to invoke on completion of this exchange + */ + void addOnCompletion(Synchronization onCompletion); + + /** + * Checks if the passed {@link Synchronization} instance is + * already contained on this exchange. + * + * @param onCompletion the callback instance that is being checked for + * @return <tt>true</tt>, if callback instance is already contained on this exchange, else <tt>false</tt> + */ + boolean containsOnCompletion(Synchronization onCompletion); + + /** + * Handover all the on completions from this exchange to the target exchange. + * + * @param target the target exchange + */ + void handoverCompletions(Exchange target); + + /** + * Handover all the on completions from this exchange + * + * @return the on completions + */ + List<Synchronization> handoverCompletions(); + + /** * Sets the history node id (the current processor that will process the exchange) */ void setHistoryNodeId(String historyNodeId); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultConsumerTemplate.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultConsumerTemplate.java index 3acec66..b5d6652 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultConsumerTemplate.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultConsumerTemplate.java @@ -22,6 +22,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.ConsumerTemplate; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.spi.ConsumerCache; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.CamelContextHelper; @@ -224,7 +225,7 @@ public class DefaultConsumerTemplate extends ServiceSupport implements ConsumerT } if (exchange.getUnitOfWork() == null) { // handover completions and done them manually to ensure they are being executed - List<Synchronization> synchronizations = exchange.handoverCompletions(); + List<Synchronization> synchronizations = exchange.adapt(ExtendedExchange.class).handoverCompletions(); UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log); } else { // done the unit of work diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 4f08567..a96df0b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -28,6 +28,7 @@ import java.util.function.Predicate; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Route; @@ -185,7 +186,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { if (handover && (filter == null || filter.test(synchronization))) { log.trace("Handover synchronization {} to: {}", synchronization, target); - target.addOnCompletion(synchronization); + target.adapt(ExtendedExchange.class).addOnCompletion(synchronization); // remove it if its handed over it.remove(); } else { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 5cfd5e0..9c9164f 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -594,7 +594,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { if (routeId == null) { this.routeId = routeContext.getRouteId(); } - exchange.setFromRouteId(routeId); + exchange.adapt(ExtendedExchange.class).setFromRouteId(routeId); } // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW @@ -604,7 +604,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // If there is no existing UoW, then we should start one and // terminate it once processing is completed for the exchange. created = createUnitOfWork(exchange); - exchange.setUnitOfWork(created); + exchange.adapt(ExtendedExchange.class).setUnitOfWork(created); created.start(); } @@ -856,7 +856,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { if (!contains) { added = true; tracer.traceBeforeRoute(routeDefinition, exchange); - exchange.addOnCompletion(tracingAfterRoute); + exchange.adapt(ExtendedExchange.class).addOnCompletion(tracingAfterRoute); } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java index efd8b67..411b36a 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java @@ -27,6 +27,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; +import org.apache.camel.ExtendedExchange; import org.apache.camel.impl.engine.DefaultProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; @@ -326,7 +327,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA if (isShareUnitOfWork()) { target.setProperty(Exchange.PARENT_UNIT_OF_WORK, source.getUnitOfWork()); // and then share the unit of work - target.setUnitOfWork(source.getUnitOfWork()); + target.adapt(ExtendedExchange.class).setUnitOfWork(source.getUnitOfWork()); } return target; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java index c441146..828e3ef 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -25,6 +25,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.ExtendedExchange; import org.apache.camel.PollingConsumer; import org.apache.camel.impl.engine.DefaultConsumerCache; import org.apache.camel.spi.ConsumerCache; @@ -290,7 +291,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout copyResultsPreservePattern(exchange, aggregatedExchange); // handover any synchronization if (resourceExchange != null) { - resourceExchange.handoverCompletions(exchange); + resourceExchange.adapt(ExtendedExchange.class).handoverCompletions(exchange); } } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java index 7250144..0f09d12 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java @@ -31,6 +31,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -239,7 +240,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac @Override protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, boolean hasNext) { // do not share unit of work - exchange.setUnitOfWork(null); + exchange.adapt(ExtendedExchange.class).setUnitOfWork(null); exchange.setProperty(Exchange.SPLIT_INDEX, index); if (allPairs instanceof Collection) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index f894765..4924359 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -42,6 +42,7 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Navigate; import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Predicate; @@ -823,7 +824,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat log.debug("Processing aggregated exchange: {}", exchange); // add on completion task so we remember to update the inProgressCompleteExchanges - exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); // send this exchange // the call to schedule last if needed to ensure in-order processing of the aggregates diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java index a9de078..b126382 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java @@ -26,6 +26,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.spi.IdAware; @@ -149,7 +150,7 @@ public class IdempotentConsumer extends AsyncProcessorSupport implements CamelCo target = new IdempotentConsumerCallback(exchange, onCompletion, callback, completionEager); if (!completionEager) { // the scope is to do the idempotent completion work as an unit of work on the exchange when its done being routed - exchange.addOnCompletion(onCompletion); + exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); } } catch (Exception e) { exchange.setException(e); diff --git a/core/camel-core/src/test/java/org/apache/camel/builder/xml/XsltBuilderTest.java b/core/camel-core/src/test/java/org/apache/camel/builder/xml/XsltBuilderTest.java index ba16332..1f56420 100644 --- a/core/camel-core/src/test/java/org/apache/camel/builder/xml/XsltBuilderTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/builder/xml/XsltBuilderTest.java @@ -26,6 +26,7 @@ import javax.xml.transform.Source; import javax.xml.transform.Templates; import javax.xml.transform.sax.SAXSource; +import org.apache.camel.ExtendedExchange; import org.w3c.dom.Document; import org.xml.sax.InputSource; @@ -257,7 +258,7 @@ public class XsltBuilderTest extends ContextTestSupport { assertTrue(body.endsWith("<goodbye>world!</goodbye>")); // now done the exchange - List<Synchronization> onCompletions = exchange.handoverCompletions(); + List<Synchronization> onCompletions = exchange.adapt(ExtendedExchange.class).handoverCompletions(); UnitOfWorkHelper.doneSynchronizations(exchange, onCompletions, log); // the file should be deleted diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java index 95a39cd..893ca0d 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java @@ -18,6 +18,7 @@ package org.apache.camel.component.seda; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -52,7 +53,7 @@ public class SedaDiscardIfNoConsumerTest extends ContextTestSupport { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("Hello World"); - exchange.addOnCompletion(myCompletion); + exchange.adapt(ExtendedExchange.class).addOnCompletion(myCompletion); } }); diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java index c2bc872..2ac0802 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedWithOnCompletionTest.java @@ -18,6 +18,7 @@ package org.apache.camel.component.seda; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -47,7 +48,7 @@ public class SedaInOutChainedWithOnCompletionTest extends ContextTestSupport { from("seda:a").process(new Processor() { public void process(Exchange exchange) throws Exception { // should come in last - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:c", "onCustomCompletion"); diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteOnCompletionTest.java index 42e55b6..c987a0d 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteOnCompletionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteOnCompletionTest.java @@ -19,6 +19,7 @@ package org.apache.camel.component.seda; import org.apache.camel.CamelExecutionException; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -56,7 +57,7 @@ public class SedaWaitForTaskCompleteOnCompletionTest extends ContextTestSupport from("direct:start").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { done = done + "A"; diff --git a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerOnCompletionTest.java index 412fb90..535d7b2 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerOnCompletionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerOnCompletionTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -55,7 +56,7 @@ public class SedaWaitForTaskNewerOnCompletionTest extends ContextTestSupport { from("direct:start").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { done = done + "A"; diff --git a/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java b/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java index 6ff67e0..6713c77 100644 --- a/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java @@ -26,6 +26,7 @@ import java.io.InputStreamReader; import org.apache.camel.CamelContext; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.StreamCache; import org.apache.camel.converter.IOConverter; import org.apache.camel.impl.engine.DefaultUnitOfWork; @@ -60,7 +61,7 @@ public class CachedOutputStreamTest extends ContextTestSupport { exchange = new DefaultExchange(context); UnitOfWork uow = new DefaultUnitOfWork(exchange); - exchange.setUnitOfWork(uow); + exchange.adapt(ExtendedExchange.class).setUnitOfWork(uow); } @Override diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java index cdbf5c0..ebe5477 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/CamelPostProcessorHelperTest.java @@ -27,6 +27,7 @@ import org.apache.camel.Consume; import org.apache.camel.ContextTestSupport; import org.apache.camel.EndpointInject; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.FluentProducerTemplate; import org.apache.camel.NoSuchBeanException; import org.apache.camel.PollingConsumer; @@ -506,7 +507,7 @@ public class CamelPostProcessorHelperTest extends ContextTestSupport { @Consume("seda:foo") public void consumeSomething(String body, Exchange exchange) { - exchange.addOnCompletion(mySynchronization); + exchange.adapt(ExtendedExchange.class).addOnCompletion(mySynchronization); assertEquals("Hello World", body); template.sendBody("mock:result", body); } @@ -521,7 +522,7 @@ public class CamelPostProcessorHelperTest extends ContextTestSupport { assertEquals("Hello World", body); Exchange exchange = producer.getEndpoint().createExchange(); - exchange.addOnCompletion(mySynchronization); + exchange.adapt(ExtendedExchange.class).addOnCompletion(mySynchronization); exchange.getIn().setBody(body); producer.process(exchange); } diff --git a/core/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java b/core/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java index 697950b..01a7309 100644 --- a/core/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/issues/GertJBIIssueTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -74,7 +75,7 @@ public class GertJBIIssueTest extends ContextTestSupport { template.send("direct:start", new Processor() { public void process(Exchange exchange) throws Exception { - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { cause = exchange.getException(); diff --git a/core/camel-core/src/test/java/org/apache/camel/language/simple/SimpleTest.java b/core/camel-core/src/test/java/org/apache/camel/language/simple/SimpleTest.java index 311f1a8..a2a15e6 100644 --- a/core/camel-core/src/test/java/org/apache/camel/language/simple/SimpleTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/language/simple/SimpleTest.java @@ -32,6 +32,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; import org.apache.camel.ExpressionIllegalSyntaxException; +import org.apache.camel.ExtendedExchange; import org.apache.camel.InvalidPayloadException; import org.apache.camel.LanguageTestSupport; import org.apache.camel.Predicate; @@ -217,7 +218,7 @@ public class SimpleTest extends LanguageTestSupport { assertExpression("${header.foo}", "abc"); assertExpression("${headers.foo}", "abc"); assertExpression("${routeId}", exchange.getFromRouteId()); - exchange.setFromRouteId("myRouteId"); + exchange.adapt(ExtendedExchange.class).setFromRouteId("myRouteId"); assertExpression("${routeId}", "myRouteId"); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionOnCompletionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionOnCompletionTest.java index e2ef8d5..05a9cd1 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionOnCompletionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCOnCompletionOnCompletionTest.java @@ -18,6 +18,7 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -50,7 +51,7 @@ public class MDCOnCompletionOnCompletionTest extends ContextTestSupport { from("timer:foo?period=5000").routeId("route-a").setBody().constant("Hello World").onCompletion().process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.addOnCompletion(new MyOnCompletion()); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new MyOnCompletion()); } }).end().to("log:foo").to("direct:b"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java index a084bf3..0a45e35 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java @@ -18,6 +18,7 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -65,26 +66,26 @@ public class OnCompletionContainsTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { SynchronizationAdapter adapter = new SimpleSynchronizationAdapter("mock:sync", "A"); - exchange.addOnCompletion(adapter); + exchange.adapt(ExtendedExchange.class).addOnCompletion(adapter); // should not add the adapter again as we already have // it - if (!exchange.containsOnCompletion(adapter)) { - exchange.addOnCompletion(adapter); + if (!exchange.adapt(ExtendedExchange.class).containsOnCompletion(adapter)) { + exchange.adapt(ExtendedExchange.class).addOnCompletion(adapter); } adapter = new SimpleSynchronizationAdapter("mock:sync", "B"); - exchange.addOnCompletion(adapter); + exchange.adapt(ExtendedExchange.class).addOnCompletion(adapter); // now add the B again as we want to test that this also // work - if (exchange.containsOnCompletion(adapter)) { - exchange.addOnCompletion(adapter); + if (exchange.adapt(ExtendedExchange.class).containsOnCompletion(adapter)) { + exchange.adapt(ExtendedExchange.class).addOnCompletion(adapter); } // add a C that is no a SimpleSynchronizationAdapter // class - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:sync", "C"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionShouldBeLastTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionShouldBeLastTest.java index d360527..1ea974b 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionShouldBeLastTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/OnCompletionShouldBeLastTest.java @@ -18,6 +18,7 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -44,7 +45,7 @@ public class OnCompletionShouldBeLastTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:sync", "A"); @@ -56,7 +57,7 @@ public class OnCompletionShouldBeLastTest extends ContextTestSupport { } }); - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:sync", "B"); @@ -68,7 +69,7 @@ public class OnCompletionShouldBeLastTest extends ContextTestSupport { } }); - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { template.sendBody("mock:sync", "C"); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java index 5f1c81c..bb0338e 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareSynchronizationTest.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.builder.RouteBuilder; @@ -43,7 +44,7 @@ public class RouteAwareSynchronizationTest extends ContextTestSupport { template.send("direct:start", new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.addOnCompletion(new MyRouteAware()); + exchange.adapt(ExtendedExchange.class).addOnCompletion(new MyRouteAware()); exchange.getIn().setBody("Hello World"); } }); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java index 1f5c6f2..b0f82df 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -58,7 +59,7 @@ public class AsyncEndpointDelayUoWTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { beforeThreadName = Thread.currentThread().getName(); - exchange.addOnCompletion(sync); + exchange.adapt(ExtendedExchange.class).addOnCompletion(sync); } }).to("mock:before").to("log:before").delay(500).asyncDelayed().process(new Processor() { public void process(Exchange exchange) throws Exception { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java index 957ef70..ce263b0 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWFailedTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.CamelExecutionException; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -66,7 +67,7 @@ public class AsyncEndpointUoWFailedTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { beforeThreadName = Thread.currentThread().getName(); - exchange.addOnCompletion(sync); + exchange.adapt(ExtendedExchange.class).addOnCompletion(sync); } }).to("mock:before").to("log:before").to("async:bye:camel").process(new Processor() { public void process(Exchange exchange) throws Exception { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java index 8ec5f58..45a68d0 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.SynchronizationAdapter; @@ -60,7 +61,7 @@ public class AsyncEndpointUoWTest extends ContextTestSupport { from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { beforeThreadName = Thread.currentThread().getName(); - exchange.addOnCompletion(sync); + exchange.adapt(ExtendedExchange.class).addOnCompletion(sync); } }).to("mock:before").to("log:before").to("async:bye:camel").process(new Processor() { public void process(Exchange exchange) throws Exception { diff --git a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index f5c2567..25280a3 100644 --- a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -34,6 +34,7 @@ import javax.crypto.CipherInputStream; import javax.crypto.CipherOutputStream; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.StreamCache; import org.apache.camel.spi.StreamCachingStrategy; @@ -231,7 +232,7 @@ public final class FileInputStreamCache extends InputStream implements StreamCac streamCacheUnitOfWork.addSynchronization(onCompletion); } else { // add on completion so we can cleanup after the exchange is done such as deleting temporary files - exchange.addOnCompletion(onCompletion); + exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); } } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index e263c9a..1248651 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -21,6 +21,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.RouteAware; @@ -93,11 +94,11 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw // if the exchange doesn't have from route id set, then set it if it originated // from this unit of work if (route != null && exchange.getFromRouteId() == null) { - exchange.setFromRouteId(route.getId()); + exchange.adapt(ExtendedExchange.class).setFromRouteId(route.getId()); } UnitOfWork uow = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(exchange); - exchange.setUnitOfWork(uow); + exchange.adapt(ExtendedExchange.class).setUnitOfWork(uow); uow.start(); return uow; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index b88ecdd..2533b0c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -17,7 +17,6 @@ package org.apache.camel.support; import java.util.ArrayList; -import java.util.Date; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -539,7 +538,7 @@ public final class DefaultExchange implements ExtendedExchange { public void handoverCompletions(Exchange target) { if (onCompletions != null) { for (Synchronization onCompletion : onCompletions) { - target.addOnCompletion(onCompletion); + target.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); } // cleanup the temporary on completion list as they have been handed over onCompletions.clear(); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 52568cb..0696b72 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -35,6 +35,7 @@ import org.apache.camel.CamelExecutionException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.MessageHistory; import org.apache.camel.NoSuchBeanException; @@ -254,7 +255,7 @@ public final class ExchangeHelper { copy.getIn().setMessageId(null); } // do not share the unit of work - copy.setUnitOfWork(null); + copy.adapt(ExtendedExchange.class).setUnitOfWork(null); // do not reuse the message id // hand over on completion to the copy if we got any UnitOfWork uow = exchange.getUnitOfWork(); @@ -814,7 +815,7 @@ public final class ExchangeHelper { } if (handover) { // Need to hand over the completion for async invocation - exchange.handoverCompletions(answer); + exchange.adapt(ExtendedExchange.class).handoverCompletions(answer); } answer.setIn(exchange.getIn().copy()); if (exchange.hasOut()) { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java index afdc6d2..95bb88a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Route; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.SynchronizationRouteAware; @@ -64,7 +65,7 @@ public final class UnitOfWorkHelper { } // remove uow from exchange as its done - exchange.setUnitOfWork(null); + exchange.adapt(ExtendedExchange.class).setUnitOfWork(null); } public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) {