Author: davsclaus Date: Tue Feb 17 16:44:38 2009 New Revision: 745139 URL: http://svn.apache.org/viewvc?rev=745139&view=rev Log: CAMEL-971: Introduced GroupedExchange for aggregate N exchanges into 1 single grouped exchange holding the X exchanges.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java?rev=745139&r1=745138&r2=745139&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileComponent.java Tue Feb 17 16:44:38 2009 @@ -53,7 +53,7 @@ /** * Header key holding file path to a local work directory containg a consumed file (if any) */ - public static final String HEADER_FILE_LOCAL_WORK_PATH= "CamelFileLocalWorkPath"; + public static final String HEADER_FILE_LOCAL_WORK_PATH = "CamelFileLocalWorkPath"; protected GenericFileEndpoint<File> buildFileEndpoint(String uri, String remaining, Map parameters) throws Exception { File file = new File(remaining); Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java?rev=745139&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java Tue Feb 17 16:44:38 2009 @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; + + +/** + * A grouped exchange that groups together other exchanges, as a holder object. + * <p/> + * This grouped exchange is useable for the aggregator so multiple exchanges can be grouped + * into this single exchange and thus only one exchange is sent for further processing. + */ +public class GroupedExchange extends DefaultExchange { + + private List<Exchange> exchanges = new ArrayList<Exchange>(); + + public GroupedExchange(CamelContext context) { + super(context); + } + + public GroupedExchange(CamelContext context, ExchangePattern pattern) { + super(context, pattern); + } + + public GroupedExchange(Exchange parent) { + super(parent); + } + + public GroupedExchange(Endpoint fromEndpoint) { + super(fromEndpoint); + } + + public GroupedExchange(Endpoint fromEndpoint, ExchangePattern pattern) { + super(fromEndpoint, pattern); + } + + public List<Exchange> getExchanges() { + return exchanges; + } + + public void setExchanges(List<Exchange> exchanges) { + this.exchanges = exchanges; + } + + public void addExchange(Exchange exchange) { + this.exchanges.add(exchange); + } + + public int size() { + return exchanges.size(); + } + + public Exchange get(int index) { + return exchanges.get(index); + } + + @Override + public String toString() { + return "Exchange[Grouped with: " + exchanges.size() + " exchanges]"; + } + +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=745139&r1=745138&r2=745139&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java Tue Feb 17 16:44:38 2009 @@ -66,9 +66,12 @@ private String strategyRef; @XmlAttribute(required = false) private String collectionRef; + @XmlAttribute(required = false) + private Boolean groupExchanges; @XmlElement(name = "completedPredicate", required = false) private ExpressionSubElementType completedPredicate; + public AggregatorType() { } @@ -168,6 +171,10 @@ if (outBatchSize != null) { aggregator.setOutBatchSize(outBatchSize); } + + if (groupExchanges != null) { + aggregator.setGroupExchanges(groupExchanges); + } return aggregator; } @@ -256,6 +263,14 @@ return completedPredicate; } + public Boolean getGroupExchanges() { + return groupExchanges; + } + + public void setGroupExchanges(Boolean groupExchanges) { + this.groupExchanges = groupExchanges; + } + // Fluent API //------------------------------------------------------------------------- @@ -337,6 +352,17 @@ } /** + * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single + * combined {...@link org.apache.camel.impl.GroupedExchange} class holding all the aggregated exchanges. + * + * @return the builder + */ + public AggregatorType groupExchanges() { + setGroupExchanges(true); + return this; + } + + /** * Sets the predicate used to determine if the aggregation is completed * * @return the clause used to create the predicate @@ -402,5 +428,5 @@ if (isInheritErrorHandler()) { output.setErrorHandlerBuilder(getErrorHandlerBuilder()); } - } + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=745139&r1=745138&r2=745139&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Tue Feb 17 16:44:38 2009 @@ -22,6 +22,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.impl.GroupedExchange; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; import org.apache.camel.spi.ExceptionHandler; @@ -42,13 +43,14 @@ private long batchTimeout = DEFAULT_BATCH_TIMEOUT; private int batchSize = DEFAULT_BATCH_SIZE; private int outBatchSize; + private boolean groupExchanges; private Processor processor; private Collection<Exchange> collection; private ExceptionHandler exceptionHandler; private BatchSender sender; - + public BatchProcessor(Processor processor, Collection<Exchange> collection) { ObjectHelper.notNull(processor, "processor"); ObjectHelper.notNull(collection, "collection"); @@ -112,6 +114,14 @@ this.batchTimeout = batchTimeout; } + public boolean isGroupExchanges() { + return groupExchanges; + } + + public void setGroupExchanges(boolean groupExchanges) { + this.groupExchanges = groupExchanges; + } + public Processor getProcessor() { return processor; } @@ -219,11 +229,27 @@ } private void sendExchanges() throws Exception { + GroupedExchange grouped = null; + Iterator<Exchange> iter = collection.iterator(); while (iter.hasNext()) { Exchange exchange = iter.next(); iter.remove(); - processExchange(exchange); + if (!groupExchanges) { + // non grouped so process the exchange one at a time + processExchange(exchange); + } else { + // grouped so add all exchanges into one group + if (grouped == null) { + grouped = new GroupedExchange(exchange.getContext()); + } + grouped.addExchange(exchange); + } + } + + // and after adding process the single grouped exchange + if (grouped != null) { + processExchange(grouped); } } } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java?rev=745139&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java Tue Feb 17 16:44:38 2009 @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.GroupedExchange; + +/** + * Unit test for aggregate grouped exchanges. + */ +public class AggregateGroupedExchangeTest extends ContextTestSupport { + + public void testGrouped() throws Exception { + // START SNIPPET: e2 + MockEndpoint result = getMockEndpoint("mock:result"); + + // we expect 1 messages since we group all we get in using the same correlation key + result.expectedMessageCount(1); + + // then we sent all the message at once + template.sendBody("direct:start", "100"); + template.sendBody("direct:start", "150"); + template.sendBody("direct:start", "130"); + template.sendBody("direct:start", "200"); + template.sendBody("direct:start", "190"); + + assertMockEndpointsSatisfied(); + + Exchange out = result.getExchanges().get(0); + assertTrue(out instanceof GroupedExchange); + GroupedExchange grouped = (GroupedExchange)out; + assertEquals(5, grouped.size()); + + assertEquals("100", grouped.get(0).getIn().getBody(String.class)); + assertEquals("150", grouped.get(1).getIn().getBody(String.class)); + assertEquals("130", grouped.get(2).getIn().getBody(String.class)); + assertEquals("200", grouped.get(3).getIn().getBody(String.class)); + assertEquals("190", grouped.get(4).getIn().getBody(String.class)); + // END SNIPPET: e2 + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // START SNIPPET: e1 + // our route is aggregating from the direct queue and sending the response to the mock + from("direct:start") + // aggregated using id as correlation so each is unqiue and thus we batch everything + .aggregate().simple("id") + // wait for 0.5 seconds to aggregate + .batchTimeout(500L) + // group the exchanges so we get one single exchange containing all the others + .groupExchanges() + .to("mock:result"); + // END SNIPPET: e1 + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date