Author: davsclaus Date: Mon Apr 12 09:29:28 2010 New Revision: 933163 URL: http://svn.apache.org/viewvc?rev=933163&view=rev Log: CAMEL-2623: Prepareing for seda/vm to be able to work with chained request/reply pattern.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java camel/trunk/camel-core/src/test/resources/log4j.properties Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=933163&r1=933162&r2=933163&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Mon Apr 12 09:29:28 2010 @@ -64,20 +64,36 @@ public class SedaProducer extends Collec copy.addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange response) { - try { - ExchangeHelper.copyResults(exchange, response); - } finally { - // always ensure latch is triggered - latch.countDown(); + // check for timeout, which then already would have invoked the latch + if (latch.getCount() == 0) { + if (log.isTraceEnabled()) { + log.trace(this + ". Timeout occurred so response will be ignored: " + (response.hasOut() ? response.getOut() : response.getIn())); + } + return; + } else { + if (log.isTraceEnabled()) { + log.trace(this + " with response: " + (response.hasOut() ? response.getOut() : response.getIn())); + } + try { + ExchangeHelper.copyResults(exchange, response); + } finally { + // always ensure latch is triggered + latch.countDown(); + } } } + + @Override + public String toString() { + return "onDone at [" + endpoint.getEndpointUri() + "]"; + } }); queue.add(copy); if (timeout > 0) { if (log.isTraceEnabled()) { - log.trace("Waiting for task to complete using timeout (ms): " + timeout); + log.trace("Waiting for task to complete using timeout (ms): " + timeout + " at [" + endpoint.getEndpointUri() + "]"); } // lets see if we can get the task done before the timeout boolean done = latch.await(timeout, TimeUnit.MILLISECONDS); @@ -86,7 +102,7 @@ public class SedaProducer extends Collec } } else { if (log.isTraceEnabled()) { - log.trace("Waiting for task to complete (blocking)"); + log.trace("Waiting for task to complete (blocking) at [" + endpoint.getEndpointUri() + "]"); } // no timeout then wait until its done latch.await(); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=933163&r1=933162&r2=933163&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Mon Apr 12 09:29:28 2010 @@ -17,6 +17,7 @@ package org.apache.camel.impl; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -98,6 +99,10 @@ public class DefaultUnitOfWork implement if (synchronizations == null) { synchronizations = new ArrayList<Synchronization>(); } + // must add to top of list so we run last added first (FILO) + if (LOG.isTraceEnabled()) { + LOG.trace("Adding synchronization " + synchronization); + } synchronizations.add(synchronization); } @@ -136,6 +141,8 @@ public class DefaultUnitOfWork implement } if (synchronizations != null && !synchronizations.isEmpty()) { + // reverse so we invoke it FILO style instead of FIFO + Collections.reverse(synchronizations); // invoke synchronization callbacks for (Synchronization synchronization : synchronizations) { try { Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java?rev=933163&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java Mon Apr 12 09:29:28 2010 @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +import static org.apache.camel.language.simple.SimpleLanguage.simple; + +/** + * @version $Revision$ + */ +public class SedaInOutChainedTest extends ContextTestSupport { + + public void testInOutSedaChained() throws Exception { + getMockEndpoint("mock:a").expectedBodiesReceived("start"); + getMockEndpoint("mock:b").expectedBodiesReceived("start-a"); + getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b"); + + String reply = template.requestBody("seda:a", "start", String.class); + assertEquals("start-a-b-c", reply); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:a").to("mock:a").transform(simple("${body}-a")).to("seda:b"); + + from("seda:b").to("mock:b").transform(simple("${body}-b")).to("seda:c"); + + from("seda:c").to("mock:c").transform(simple("${body}-c")); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaInOutChainedTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=933163&r1=933162&r2=933163&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/resources/log4j.properties (original) +++ camel/trunk/camel-core/src/test/resources/log4j.properties Mon Apr 12 09:29:28 2010 @@ -23,6 +23,8 @@ log4j.rootLogger=INFO, file log4j.logger.org.apache.activemq.spring=WARN #log4j.logger.org.apache.camel=DEBUG #log4j.logger.org.apache.camel.component=TRACE +#log4j.logger.org.apache.camel.component.seda=TRACE +#log4j.logger.org.apache.camel.impl.DefaultUnitOfWork=TRACE #log4j.logger.org.apache.camel.component.mock=DEBUG #log4j.logger.org.apache.camel.component.file=TRACE log4j.logger.org.apache.camel.impl.converter=WARN