Author: davsclaus Date: Wed Jun 13 14:49:36 2012 New Revision: 1349891 URL: http://svn.apache.org/viewvc?rev=1349891&view=rev Log: CAMEL-5366: Add a way of checking if an Exchange has a given onCompletion already
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1349891&r1=1349890&r2=1349891&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Jun 13 14:49:36 2012 @@ -486,6 +486,15 @@ public interface Exchange { void addOnCompletion(Synchronization onCompletion); /** + * Checks if the passed {@link org.apache.camel.spi.Synchronization} instance is + * already contained on this exchange. + * + * @param onCompletion the callback instance that is being checked for + * @return <tt>true</tt>, if callback instance is already contained on this exchange, else <tt>false</tt> + */ + boolean containsOnCompletion(Synchronization onCompletion); + + /** * Handover all the on completions from this exchange to the target exchange. * * @param target the target exchange Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=1349891&r1=1349890&r2=1349891&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Wed Jun 13 14:49:36 2012 @@ -394,6 +394,16 @@ public final class DefaultExchange imple } } + public boolean containsOnCompletion(Synchronization onCompletion) { + if (unitOfWork != null) { + // if there is an unit of work then the completions is moved there + return unitOfWork.containsSynchronization(onCompletion); + } else { + // check temporary completions if no unit of work yet + return onCompletions != null && onCompletions.contains(onCompletion); + } + } + public void handoverCompletions(Exchange target) { if (onCompletions != null) { for (Synchronization onCompletion : onCompletions) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=1349891&r1=1349890&r2=1349891&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Wed Jun 13 14:49:36 2012 @@ -187,6 +187,10 @@ public class DefaultUnitOfWork implement } } + public synchronized boolean containsSynchronization(Synchronization synchronization) { + return synchronizations != null && synchronizations.contains(synchronization); + } + public void handoverSynchronization(Exchange target) { if (synchronizations == null || synchronizations.isEmpty()) { return; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java?rev=1349891&r1=1349890&r2=1349891&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/UnitOfWork.java Wed Jun 13 14:49:36 2012 @@ -44,6 +44,15 @@ public interface UnitOfWork extends Serv void removeSynchronization(Synchronization synchronization); /** + * Checks if the passed synchronization hook is already part of this unit of work. + * + * @param synchronization the hook + * @return <tt>true</tt>, if the passed synchronization is part of this unit of work, else <tt>false</tt> + */ + boolean containsSynchronization(Synchronization synchronization); + + /** + /** * Handover all the registered synchronizations to the target {@link org.apache.camel.Exchange}. * <p/> * This is used when a route turns into asynchronous and the {@link org.apache.camel.Exchange} that Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java?rev=1349891&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/OnCompletionContainsTest.java Wed Jun 13 14:49:36 2012 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.support.SynchronizationAdapter; + +/** + * @version + */ +public class OnCompletionContainsTest extends ContextTestSupport { + + class SimpleSynchronizationAdapter extends SynchronizationAdapter { + private String endPoint; + private String body; + + SimpleSynchronizationAdapter(String endPoint, String body) { + super(); + this.endPoint = endPoint; + this.body = body; + } + + @Override + public void onDone(Exchange exchange) { + template.sendBody(endPoint, body); + } + + @Override + public String toString() { + return body; + } + } + + public void testOnCompletionContainsTest() throws Exception { + getMockEndpoint("mock:sync").expectedBodiesReceived("C", "B", "B", "A", "Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onCompletion().to("mock:sync"); + + from("direct:start") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + SynchronizationAdapter adapter = new SimpleSynchronizationAdapter("mock:sync", "A"); + exchange.addOnCompletion(adapter); + + // should not add the adapter again as we already have it + if (!exchange.containsOnCompletion(adapter)) { + exchange.addOnCompletion(adapter); + } + + adapter = new SimpleSynchronizationAdapter("mock:sync", "B"); + exchange.addOnCompletion(adapter); + + // now add the B again as we want to test that this also work + if (exchange.containsOnCompletion(adapter)) { + exchange.addOnCompletion(adapter); + } + + // add a C that is no a SimpleSynchronizationAdapter class + exchange.addOnCompletion(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + template.sendBody("mock:sync", "C"); + } + + @Override + public String toString() { + return "C"; + } + }); + } + }) + .to("mock:result"); + } + }; + } + +} Modified: camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala?rev=1349891&r1=1349890&r2=1349891&view=diff ============================================================================== --- camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala (original) +++ camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala Wed Jun 13 14:49:36 2012 @@ -114,6 +114,8 @@ class RichExchange(val exchange : Exchan def addOnCompletion(onCompletion: Synchronization) { exchange.addOnCompletion(onCompletion) } + def containsOnCompletion(onCompletion: Synchronization) = exchange.containsOnCompletion(onCompletion) + def handoverCompletions(exchange : Exchange) { exchange.handoverCompletions(exchange) } def handoverCompletions = exchange.handoverCompletions