Author: davsclaus Date: Thu Jul 16 12:14:24 2009 New Revision: 794648 URL: http://svn.apache.org/viewvc?rev=794648&view=rev Log: CAMEL-1835: added timeout option to seda producer.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=794648&r1=794647&r2=794648&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Thu Jul 16 12:14:24 2009 @@ -81,7 +81,6 @@ private Object propertyValue; private Object actualProperty; private Processor reporter; - private int collectMaximumExchanges = -1; public MockEndpoint(String endpointUri, Component component) { super(endpointUri, component); @@ -768,14 +767,6 @@ this.reporter = reporter; } - public int getCollectMaximumExchanges() { - return collectMaximumExchanges; - } - - public void setCollectMaximumExchanges(int collectMaximumExchanges) { - this.collectMaximumExchanges = collectMaximumExchanges; - } - // Implementation methods // ------------------------------------------------------------------------- private void init() { @@ -792,7 +783,6 @@ expectedMinimumCount = -1; expectedBodyValues = null; actualBodyValues = new ArrayList(); - collectMaximumExchanges = -1; } protected synchronized void onExchange(Exchange exchange) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=794648&r1=794647&r2=794648&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Thu Jul 16 12:14:24 2009 @@ -45,6 +45,7 @@ private int size = 1000; private int concurrentConsumers = 1; private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected; + private long timeout = 30000; private Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>(); private Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>(); @@ -72,7 +73,7 @@ } public Producer createProducer() throws Exception { - return new SedaProducer(this, getQueue(), getWaitForTaskToComplete()); + return new SedaProducer(this, getQueue(), getWaitForTaskToComplete(), getTimeout()); } public Consumer createConsumer(Processor processor) throws Exception { @@ -114,6 +115,14 @@ this.waitForTaskToComplete = waitForTaskToComplete; } + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + public boolean isSingleton() { return true; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=794648&r1=794647&r2=794648&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Thu Jul 16 12:14:24 2009 @@ -18,8 +18,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.WaitForTaskToComplete; import org.apache.camel.impl.SynchronizationAdapter; import org.apache.camel.util.ExchangeHelper; @@ -30,11 +32,13 @@ public class SedaProducer extends CollectionProducer { private final SedaEndpoint endpoint; private final WaitForTaskToComplete waitForTaskToComplete; + private final long timeout; - public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete) { + public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete, long timeout) { super(endpoint, queue); this.endpoint = endpoint; this.waitForTaskToComplete = waitForTaskToComplete; + this.timeout = timeout; } @Override @@ -53,14 +57,6 @@ if (wait == WaitForTaskToComplete.Always || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) { - // only check for if there is a consumer if its the seda endpoint where we exepect a consumer in the same - // camel context. If you use the vm component the consumer could be in another camel context. - // for seda we want to check that a consumer exists otherwise we end up waiting forever for the response. - if (endpoint.getEndpointUri().startsWith("seda") && endpoint.getConsumers().isEmpty()) { - throw new IllegalStateException("Cannot send to endpoint: " + endpoint.getEndpointUri() + " as no consumers is registered." - + " With no consumers we end up waiting forever for the reply, as there are no consumers to process our exchange: " + exchange); - } - // latch that waits until we are complete final CountDownLatch latch = new CountDownLatch(1); @@ -78,7 +74,11 @@ }); queue.add(copy); - latch.await(); + // lets see if we can get the task done before the timeout + boolean done = latch.await(timeout, TimeUnit.MILLISECONDS); + if (!done) { + exchange.setException(new ExchangeTimedOutException(exchange, timeout)); + } } else { // no wait, eg its a InOnly then just add to queue and return queue.add(copy); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=794648&r1=794647&r2=794648&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Thu Jul 16 12:14:24 2009 @@ -16,7 +16,6 @@ */ package org.apache.camel.impl; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -31,7 +30,6 @@ import org.apache.camel.Message; import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Processor; -import org.apache.camel.Producer; import org.apache.camel.ProducerTemplate; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.ExchangeHelper; Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java?rev=794648&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java Thu Jul 16 12:14:24 2009 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultProducerTemplate; + +/** + * @version $Revision$ + */ +public class SedaConcurrentTest extends ContextTestSupport { + + public void testSedaConcurrentInOnly() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(20); + + // should at least take 3 sec + mock.setMinimumResultWaitTime(3000); + + for (int i = 0; i < 20; i++) { + template.sendBody("seda:foo", "Message " + i); + } + + assertMockEndpointsSatisfied(); + } + + public void testSedaConcurrentInOnlyWithAsync() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(20); + + // should at least take 3 sec + mock.setMinimumResultWaitTime(3000); + + for (int i = 0; i < 20; i++) { + template.asyncSendBody("seda:foo", "Message " + i); + } + + assertMockEndpointsSatisfied(); + } + + public void testSedaConcurrentInOut() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(20); + mock.allMessages().body().startsWith("Bye"); + + // should at least take 3 sec + mock.setMinimumResultWaitTime(3000); + + ExecutorService executors = Executors.newFixedThreadPool(10); + List<Object> replies = new ArrayList<Object>(20); + for (int i = 0; i < 20; i++) { + final int num = i; + Object out = executors.submit(new Callable<Object>() { + public Object call() throws Exception { + return template.requestBody("seda:bar", "Message " + num); + } + }); + replies.add(out); + } + + assertMockEndpointsSatisfied(); + + assertEquals(20, replies.size()); + } + + public void testSedaConcurrentInOutWithAsync() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(20); + mock.allMessages().body().startsWith("Bye"); + + // should at least take 3 sec + mock.setMinimumResultWaitTime(3000); + + // use our own template that has a higher thread pool than default camel that uses 5 + ProducerTemplate pt = new DefaultProducerTemplate(context, Executors.newFixedThreadPool(10)); + + List<Future> replies = new ArrayList<Future>(20); + for (int i = 0; i < 20; i++) { + Future<Object> out = pt.asyncRequestBody("seda:bar", "Message " + i); + replies.add(out); + } + + assertMockEndpointsSatisfied(); + + assertEquals(20, replies.size()); + for (int i = 0; i < 20; i++) { + String out = (String) replies.get(i).get(); + assertTrue(out.startsWith("Bye")); + } + + pt.stop(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo?concurrentConsumers=10") + .to("mock:before").delay(2000).to("mock:result"); + + from("seda:bar?concurrentConsumers=10") + .to("mock:before").delay(2000).transform(body().prepend("Bye ")).to("mock:result"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java?rev=794648&r1=794647&r2=794648&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaNoConsumerTest.java Thu Jul 16 12:14:24 2009 @@ -18,6 +18,7 @@ import org.apache.camel.CamelExecutionException; import org.apache.camel.ContextTestSupport; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.builder.RouteBuilder; /** @@ -33,9 +34,9 @@ public void testInOut() throws Exception { try { template.requestBody("direct:start", "Hello World"); + fail("Should throw an exception"); } catch (CamelExecutionException e) { - assertIsInstanceOf(IllegalStateException.class, e.getCause()); - assertTrue(e.getCause().getMessage().startsWith("Cannot send to endpoint: seda://foo as no consumers is registered.")); + assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause()); } } @@ -44,7 +45,7 @@ return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("seda:foo"); + from("direct:start").to("seda:foo?timeout=1000"); } }; } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java?rev=794648&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java Thu Jul 16 12:14:24 2009 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SedaTimeoutTest extends ContextTestSupport { + + public void testSedaNoTineout() throws Exception { + Future<String> out = template.asyncRequestBody("seda:foo", "World", String.class); + assertEquals("Bye World", out.get()); + } + + public void testSedaTineout() throws Exception { + Future<String> out = template.asyncRequestBody("seda:foo?timeout=1000", "World", String.class); + try { + out.get(); + fail("Should have thrown an exception"); + } catch (ExecutionException e) { + assertIsInstanceOf(CamelExecutionException.class, e.getCause()); + assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause().getCause()); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo").to("mock:before").delay(3000).transform(body().prepend("Bye ")).to("mock:result"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaTimeoutTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date