Author: ningjiang Date: Wed Jul 28 06:52:00 2010 New Revision: 979959 URL: http://svn.apache.org/viewvc?rev=979959&view=rev Log: CAMEL-3002 Add option steaming to recipient list EIP
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java (with props) camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java (with props) camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java?rev=979959&r1=979958&r2=979959&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java Wed Jul 28 06:52:00 2010 @@ -47,6 +47,7 @@ public @interface RecipientList { String delimiter() default ","; boolean parallelProcessing() default false; boolean stopOnException() default false; + boolean streaming() default false; boolean ignoreInvalidEndpoints() default false; String strategyRef() default ""; String executorServiceRef() default ""; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java?rev=979959&r1=979958&r2=979959&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java Wed Jul 28 06:52:00 2010 @@ -104,6 +104,7 @@ public class MethodInfo { recipientList.setStopOnException(annotation.stopOnException()); recipientList.setIgnoreInvalidEndpoints(annotation.ignoreInvalidEndpoints()); recipientList.setParallelProcessing(annotation.parallelProcessing()); + recipientList.setStreaming(annotation.streaming()); if (ObjectHelper.isNotEmpty(annotation.executorServiceRef())) { ExecutorService executor = CamelContextHelper.mandatoryLookup(camelContext, annotation.executorServiceRef(), ExecutorService.class); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=979959&r1=979958&r2=979959&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Wed Jul 28 06:52:00 2010 @@ -398,8 +398,8 @@ public class DefaultShutdownStrategy ext } if (size > 0) { try { - LOG.info("Waiting as there are still " + size + " inflight and pending exchanges to complete, timeout in " + - (TimeUnit.SECONDS.convert(getTimeout(), getTimeUnit()) - (loopCount++ * loopDelaySeconds)) + " seconds."); + LOG.info("Waiting as there are still " + size + " inflight and pending exchanges to complete, timeout in " + + (TimeUnit.SECONDS.convert(getTimeout(), getTimeUnit()) - (loopCount++ * loopDelaySeconds)) + " seconds."); Thread.sleep(loopDelaySeconds * 1000); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting during graceful shutdown, will force shutdown now."); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=979959&r1=979958&r2=979959&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Wed Jul 28 06:52:00 2010 @@ -57,6 +57,8 @@ public class RecipientListDefinition<Typ private Boolean stopOnException; @XmlAttribute(required = false) private Boolean ignoreInvalidEndpoints; + @XmlAttribute(required = false) + private Boolean streaming; public RecipientListDefinition() { } @@ -91,6 +93,7 @@ public class RecipientListDefinition<Typ } answer.setAggregationStrategy(createAggregationStrategy(routeContext)); answer.setParallelProcessing(isParallelProcessing()); + answer.setStreaming(isStreaming()); if (stopOnException != null) { answer.setStopOnException(isStopOnException()); } @@ -170,6 +173,16 @@ public class RecipientListDefinition<Typ setParallelProcessing(true); return this; } + + /** + * Doing the recipient list work in streaming model + * + * @return the builder + */ + public RecipientListDefinition<Type> streaming() { + setStreaming(true); + return this; + } /** * Will now stop further processing if an exception occurred during processing of an @@ -260,4 +273,12 @@ public class RecipientListDefinition<Typ public void setExecutorService(ExecutorService executorService) { this.executorService = executorService; } + + public void setStreaming(boolean streaming) { + this.streaming = streaming; + } + + public boolean isStreaming() { + return streaming != null ? streaming : false; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=979959&r1=979958&r2=979959&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Wed Jul 28 06:52:00 2010 @@ -52,6 +52,7 @@ public class RecipientList extends Servi private boolean parallelProcessing; private boolean stopOnException; private boolean ignoreInvalidEndpoints; + private boolean streaming; private ExecutorService executorService; private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy(); @@ -106,7 +107,7 @@ public class RecipientList extends Servi Iterator<Object> iter = ObjectHelper.createIterator(recipientList, delimiter); RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(), - isParallelProcessing(), getExecutorService(), false, isStopOnException()); + isParallelProcessing(), getExecutorService(), isStreaming(), isStopOnException()); rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints()); // now let the multicast process the exchange @@ -133,7 +134,15 @@ public class RecipientList extends Servi protected void doStop() throws Exception { ServiceHelper.stopService(producerCache); } - + + public boolean isStreaming() { + return streaming; + } + + public void setStreaming(boolean streaming) { + this.streaming = streaming; + } + public boolean isIgnoreInvalidEndpoints() { return ignoreInvalidEndpoints; } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java?rev=979959&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java Wed Jul 28 06:52:00 2010 @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version $Revision$ + */ +public class RecipientListParallelStreamingTest extends ContextTestSupport { + + public void testRecipientListParallel() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("c"); + + template.sendBodyAndHeader("direct:start", "Hello World", "foo", "direct:a,direct:b,direct:c"); + + assertMockEndpointsSatisfied(); + + mock.reset(); + mock.expectedBodiesReceived("b"); + + template.sendBodyAndHeader("direct:streaming", "Hello World", "foo", "direct:a,direct:b,direct:c"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .recipientList(header("foo")).parallelProcessing().to("mock:result"); + + from("direct:streaming") + .recipientList(header("foo")).parallelProcessing().streaming().to("mock:result"); + + from("direct:a").delay(100).transform(constant("a")); + from("direct:b").delay(500).transform(constant("b")); + from("direct:c").transform(constant("c")); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java?rev=979959&view=auto ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java (added) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java Wed Jul 28 06:52:00 2010 @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.RecipientListParallelStreamingTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +public class SpringRecipientListParallelStreamingTest extends RecipientListParallelStreamingTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, + "org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml"); + } +} \ No newline at end of file Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml?rev=979959&view=auto ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml (added) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml Wed Jul 28 06:52:00 2010 @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + + <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="direct:start"/> + <recipientList parallelProcessing="true"> + <header>foo</header> + </recipientList> + <to uri="mock:result"/> + </route> + <route> + <from uri="direct:streaming"/> + <recipientList parallelProcessing="true" streaming="true"> + <header>foo</header> + </recipientList> + <to uri="mock:result"/> + </route> + + <route> + <from uri="direct:a"/> + <delay><constant>100</constant></delay> + <transform><constant>a</constant></transform> + </route> + + <route> + <from uri="direct:b"/> + <delay><constant>500</constant></delay> + <transform><constant>b</constant></transform> + </route> + + <route> + <from uri="direct:c"/> + <transform><constant>c</constant></transform> + </route> + + </camelContext> + +</beans> Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml ------------------------------------------------------------------------------ svn:keywords = Rev Date Propchange: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml ------------------------------------------------------------------------------ svn:mime-type = text/xml