Author: davsclaus Date: Sat May 30 13:28:13 2009 New Revision: 780261 URL: http://svn.apache.org/viewvc?rev=780261&view=rev Log: CAMEL-825: seda component now supports request/reply. When sending to a seda endpoint the caller thread will wait for the reply if its InOut.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComplexInOutTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorDeadLetterChannelTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskIfReplyExpectedTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInPipelineTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaUnitOfWorkTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java?rev=780261&r1=780260&r2=780261&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java Sat May 30 13:28:13 2009 @@ -29,7 +29,7 @@ * @version $Revision$ */ public class CollectionProducer extends DefaultProducer implements Processor { - private final Collection<Exchange> queue; + protected final Collection<Exchange> queue; public CollectionProducer(Endpoint endpoint, Collection<Exchange> queue) { super(endpoint); @@ -37,7 +37,6 @@ } public void process(Exchange exchange) throws Exception { - // TODO: We should consider using newCopy(true) as the async() DSL Exchange copy = exchange.copy(); queue.add(copy); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=780261&r1=780260&r2=780261&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Sat May 30 13:28:13 2009 @@ -40,6 +40,8 @@ @Override protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, 1); - return new SedaEndpoint(uri, this, createQueue(uri, parameters), consumers); + SedaEndpoint answer = new SedaEndpoint(uri, this, createQueue(uri, parameters), consumers); + answer.configureProperties(parameters); + return answer; } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=780261&r1=780260&r2=780261&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Sat May 30 13:28:13 2009 @@ -29,6 +29,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.BrowsableEndpoint; @@ -43,6 +44,7 @@ private BlockingQueue<Exchange> queue; private int size = 1000; private int concurrentConsumers = 1; + private WaitForTaskToComplete waitTaskComplete = WaitForTaskToComplete.IfReplyExpected; private Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>(); private Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>(); @@ -70,7 +72,7 @@ } public Producer createProducer() throws Exception { - return new SedaProducer(this, getQueue()); + return new SedaProducer(this, getQueue(), getWaitTaskComplete()); } public Consumer createConsumer(Processor processor) throws Exception { @@ -103,7 +105,15 @@ public int getConcurrentConsumers() { return concurrentConsumers; } - + + public WaitForTaskToComplete getWaitTaskComplete() { + return waitTaskComplete; + } + + public void setWaitTaskComplete(WaitForTaskToComplete waitTaskComplete) { + this.waitTaskComplete = waitTaskComplete; + } + public boolean isSingleton() { return true; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=780261&r1=780260&r2=780261&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Sat May 30 13:28:13 2009 @@ -17,18 +17,70 @@ package org.apache.camel.component.seda; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import org.apache.camel.Exchange; +import org.apache.camel.WaitForTaskToComplete; +import org.apache.camel.impl.SynchronizationAdapter; +import org.apache.camel.util.ExchangeHelper; /** * @version $Revision$ */ public class SedaProducer extends CollectionProducer { - private SedaEndpoint endpoint; + private final SedaEndpoint endpoint; + private final WaitForTaskToComplete waitTaskComplete; - public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue) { + public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitTaskComplete) { super(endpoint, queue); this.endpoint = endpoint; + this.waitTaskComplete = waitTaskComplete; + } + + @Override + public void process(final Exchange exchange) throws Exception { + // use a new copy of the exchange to route async and handover the on completion to the new copy + // so its the new copy that performs the on completion callback when its done + Exchange copy = exchange.newCopy(true); + // set a new from endpoint to be the seda queue + copy.setFromEndpoint(endpoint); + + WaitForTaskToComplete wait = waitTaskComplete; + if (exchange.getIn().getHeader(Exchange.ASYNC_WAIT) != null) { + wait = exchange.getIn().getHeader(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class); + } + + if (wait == WaitForTaskToComplete.Always || + (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) { + + // check if there is a consumer otherwise we end up waiting forever + if (endpoint.getConsumers().isEmpty()) { + throw new IllegalStateException("Cannot send to endpoint: " + endpoint.getEndpointUri() + " as no consumers is registered." + + " With no consumers we end up waiting forever for the reply, as there are no consumers to process our exchange: " + exchange); + } + + // latch that waits until we are complete + final CountDownLatch latch = new CountDownLatch(1); + + // we should wait for the reply so install a on completion so we know when its complete + copy.addOnCompletion(new SynchronizationAdapter() { + @Override + public void onDone(Exchange response) { + try { + ExchangeHelper.copyResults(exchange, response); + } finally { + // always ensure latch is triggered + latch.countDown(); + } + } + }); + + queue.add(copy); + latch.await(); + } else { + // no wait, eg its a InOnly then just add to queue and return + queue.add(copy); + } } @Override Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=780261&r1=780260&r2=780261&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Sat May 30 13:28:13 2009 @@ -26,6 +26,8 @@ import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.TraceableUnitOfWork; import org.apache.camel.util.UuidGenerator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * The default implementation of {...@link org.apache.camel.spi.UnitOfWork} @@ -33,6 +35,7 @@ * @version $Revision$ */ public class DefaultUnitOfWork implements TraceableUnitOfWork, Service { + private static final transient Log LOG = LogFactory.getLog(DefaultUnitOfWork.class); private static final UuidGenerator DEFAULT_ID_GENERATOR = new UuidGenerator(); private String id; @@ -92,10 +95,15 @@ if (synchronizations != null && !synchronizations.isEmpty()) { boolean failed = exchange.isFailed(); for (Synchronization synchronization : synchronizations) { - if (failed) { - synchronization.onFailure(exchange); - } else { - synchronization.onComplete(exchange); + try { + if (failed) { + synchronization.onFailure(exchange); + } else { + synchronization.onComplete(exchange); + } + } catch (Exception e) { + // must catch exceptions to ensure all synchronizations have a chance to run + LOG.error("Exception occured during onCompletion. This exception will be ignored: ", e); } } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java?rev=780261&r1=780260&r2=780261&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronizationAdapter.java Sat May 30 13:28:13 2009 @@ -28,10 +28,14 @@ public class SynchronizationAdapter implements Synchronization { public void onComplete(Exchange exchange) { - // noop + onDone(exchange); } public void onFailure(Exchange exchange) { + onDone(exchange); + } + + public void onDone(Exchange exchange) { // noop } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=780261&r1=780260&r2=780261&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Sat May 30 13:28:13 2009 @@ -39,9 +39,9 @@ */ public class ThreadsProcessor extends DelegateProcessor implements Processor { - private static final int DEFAULT_THREADPOOL_SIZE = 5; - private ExecutorService executorService; - private WaitForTaskToComplete waitTaskComplete; + protected static final int DEFAULT_THREADPOOL_SIZE = 5; + protected ExecutorService executorService; + protected WaitForTaskToComplete waitTaskComplete; public ThreadsProcessor(Processor output, ExecutorService executorService, WaitForTaskToComplete waitTaskComplete) { super(output); @@ -61,13 +61,7 @@ final Exchange copy = exchange.newCopy(true); // let it execute async and return the Future - Callable<Exchange> task = new Callable<Exchange>() { - public Exchange call() throws Exception { - // must use a copy of the original exchange for processing async - output.process(copy); - return copy; - } - }; + Callable<Exchange> task = createTask(output, copy); // sumbit the task Future<Exchange> future = getExecutorService().submit(task); @@ -93,6 +87,16 @@ } } + protected Callable<Exchange> createTask(final Processor output, final Exchange copy) { + return new Callable<Exchange>() { + public Exchange call() throws Exception { + // must use a copy of the original exchange for processing async + output.process(copy); + return copy; + } + }; + } + public ExecutorService getExecutorService() { if (executorService == null) { executorService = createExecutorService(); @@ -100,7 +104,7 @@ return executorService; } - private ExecutorService createExecutorService() { + protected ExecutorService createExecutorService() { return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "AsyncProcessor", true); } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInPipelineTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInPipelineTest.java?rev=780261&r1=780260&r2=780261&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInPipelineTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInPipelineTest.java Sat May 30 13:28:13 2009 @@ -44,7 +44,7 @@ return new RouteBuilder() { public void configure() throws Exception { from("direct:start") - .pipeline("bean:one", "bean:two", "seda:x", "direct:y", "bean:three"); + .pipeline("bean:one", "bean:two", "direct:x", "direct:y", "bean:three"); } }; } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComplexInOutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComplexInOutTest.java?rev=780261&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComplexInOutTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComplexInOutTest.java Sat May 30 13:28:13 2009 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.interceptor.Tracer; + +/** + * @version $Revision$ + */ +public class SedaComplexInOutTest extends ContextTestSupport { + + public void testInOut() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("Bye World", out); + + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + getContext().addInterceptStrategy(new Tracer()); + + from("direct:start").to("seda:a"); + + from("seda:a").to("log:bar", "seda:b"); + from("seda:b").delay(10).to("direct:c"); + + from("direct:c").transform(constant("Bye World")).to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComplexInOutTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaComplexInOutTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyTest.java?rev=780261&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyTest.java Sat May 30 13:28:13 2009 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SedaInOnlyTest extends ContextTestSupport { + + public void testInOnly() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo"); + + from("seda:foo").to("mock:result"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOnlyTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutTest.java?rev=780261&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutTest.java Sat May 30 13:28:13 2009 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SedaInOutTest extends ContextTestSupport { + + public void testInOut() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("Bye World", out); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo"); + + from("seda:foo").transform(constant("Bye World")).to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorDeadLetterChannelTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorDeadLetterChannelTest.java?rev=780261&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorDeadLetterChannelTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorDeadLetterChannelTest.java Sat May 30 13:28:13 2009 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SedaInOutWithErrorDeadLetterChannelTest extends ContextTestSupport { + + public void testInOutWithErrorUsingDLC() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + + try { + template.requestBody("direct:start", "Hello World", String.class); + fail("Should have thrown an exception"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("Damn I cannot do this", e.getCause().getMessage()); + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).delay(0)); + + from("direct:start").to("seda:foo"); + + from("seda:foo").transform(constant("Bye World")) + .throwException(new IllegalArgumentException("Damn I cannot do this")) + .to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorDeadLetterChannelTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorDeadLetterChannelTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorTest.java?rev=780261&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorTest.java Sat May 30 13:28:13 2009 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SedaInOutWithErrorTest extends ContextTestSupport { + + public void testInOutWithError() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(0); + + try { + template.requestBody("direct:start", "Hello World", String.class); + fail("Should have thrown an exception"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("Damn I cannot do this", e.getCause().getMessage()); + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo"); + + from("seda:foo").transform(constant("Bye World")) + .throwException(new IllegalArgumentException("Damn I cannot do this")) + .to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutWithErrorTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java?rev=780261&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java Sat May 30 13:28:13 2009 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SedaNoConsumerTest extends ContextTestSupport { + + public void testInOnly() throws Exception { + // no problem for in only as we do not expect a reply + template.sendBody("direct:start", "Hello World"); + } + + public void testInOut() throws Exception { + try { + template.requestBody("direct:start", "Hello World"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(IllegalStateException.class, e.getCause()); + assertTrue(e.getCause().getMessage().startsWith("Cannot send to endpoint: seda:foo as no consumers is registered.")); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaUnitOfWorkTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaUnitOfWorkTest.java?rev=780261&r1=780260&r2=780261&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaUnitOfWorkTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaUnitOfWorkTest.java Sat May 30 13:28:13 2009 @@ -43,8 +43,11 @@ assertMockEndpointsSatisfied(); + // give time for on completiom to run + Thread.sleep(100); + assertEquals("onCompleteA", sync); - assertEquals("processor", lastOne); + assertEquals("onCompleteA", lastOne); } @Override @@ -59,11 +62,9 @@ .to("seda:foo"); from("seda:foo") - // use a little delay to allow the first route to complete - .delay(200) .process(new Processor() { public void process(Exchange exchange) throws Exception { - assertEquals("onCompleteA", sync); + assertEquals(null, sync); } }) .process(new Processor() { Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteTest.java?rev=780261&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteTest.java Sat May 30 13:28:13 2009 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SedaWaitForTaskCompleteTest extends ContextTestSupport { + + public void testInOut() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("Bye World", out); + + assertMockEndpointsSatisfied(); + } + + public void testInOnly() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + // we send an in only but we use Always to wait for it to complete + // and since the route changes the payload we can get the response anyway + Exchange out = template.send("direct:start", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + exchange.setPattern(ExchangePattern.InOnly); + } + }); + assertEquals("Bye World", out.getOut().getBody()); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo?waitTaskComplete=Always"); + + from("seda:foo?waitTaskComplete=Always").transform(constant("Bye World")).to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskCompleteTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskIfReplyExpectedTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskIfReplyExpectedTest.java?rev=780261&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskIfReplyExpectedTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskIfReplyExpectedTest.java Sat May 30 13:28:13 2009 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SedaWaitForTaskIfReplyExpectedTest extends ContextTestSupport { + + public void testInOut() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("Bye World", out); + + assertMockEndpointsSatisfied(); + } + + public void testInOnly() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + Exchange out = template.send("direct:start", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + exchange.setPattern(ExchangePattern.InOnly); + } + }); + // we do not expecy a reply and thus do no wait so we just get our own input back + assertEquals("Hello World", out.getIn().getBody()); + assertEquals(null, out.getOut().getBody()); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo?waitTaskComplete=IfReplyExpected"); + + from("seda:foo?waitTaskComplete=IfReplyExpected").transform(constant("Bye World")).to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskIfReplyExpectedTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskIfReplyExpectedTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerTest.java?rev=780261&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerTest.java Sat May 30 13:28:13 2009 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SedaWaitForTaskNewerTest extends ContextTestSupport { + + public void testInOut() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + String out = template.requestBody("direct:start", "Hello World", String.class); + // we do not wait for the response so we just get our own input back + assertEquals("Hello World", out); + + assertMockEndpointsSatisfied(); + } + + public void testInOnly() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + Exchange out = template.send("direct:start", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + exchange.setPattern(ExchangePattern.InOnly); + } + }); + // we do not wait for the response so we just get our own input back + assertEquals("Hello World", out.getIn().getBody()); + assertEquals(null, out.getOut().getBody()); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:foo?waitTaskComplete=Newer"); + + from("seda:foo?waitTaskComplete=Newer").transform(constant("Bye World")).to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaWaitForTaskNewerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date