Author: davsclaus Date: Sun Feb 14 09:59:12 2010 New Revision: 909995 URL: http://svn.apache.org/viewvc?rev=909995&view=rev Log: CAMEL-1686: Aggregator now lets completion predicate being evaluated on the fly which allows the predicate to trigger before the batch timeout.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=909995&r1=909994&r2=909995&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Sun Feb 14 09:59:12 2010 @@ -902,6 +902,7 @@ return; } + // super will invoke doStart which will prepare internal services before we continue and start the routes below super.start(); LOG.debug("Starting routes..."); @@ -972,6 +973,7 @@ for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) { Integer order = entry.getKey(); Route route = entry.getValue().getRoute(); + RouteService routeService = entry.getValue().getRouteService(); for (Consumer consumer : routeService.getInputs().values()) { Endpoint endpoint = consumer.getEndpoint(); @@ -1028,7 +1030,7 @@ } catch (Exception e) { // fire event that we failed to start EventHelper.notifyCamelContextStartupFailed(this, e); - // rethrown cause + // rethrow cause throw e; } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=909995&r1=909994&r2=909995&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java Sun Feb 14 09:59:12 2010 @@ -54,6 +54,7 @@ Predicate aggregationCompletedPredicate) { this(processor, new PredicateAggregationCollection(correlationExpression, aggregationStrategy, aggregationCompletedPredicate)); this.correlationExpression = correlationExpression; + setCompletionPredicate(aggregationCompletedPredicate); } public Aggregator(Processor processor, AggregationCollection collection) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=909995&r1=909994&r2=909995&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Sun Feb 14 09:59:12 2010 @@ -22,6 +22,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -30,6 +31,7 @@ import org.apache.camel.CamelException; import org.apache.camel.Exchange; import org.apache.camel.Navigate; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; @@ -47,6 +49,8 @@ */ public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> { + // TODO: Should aggregate on the fly as well + public static final long DEFAULT_BATCH_TIMEOUT = 1000L; public static final int DEFAULT_BATCH_SIZE = 100; @@ -57,6 +61,7 @@ private int outBatchSize; private boolean groupExchanges; private boolean batchConsumer; + private Predicate completionPredicate; private final Processor processor; private final Collection<Exchange> collection; @@ -154,6 +159,14 @@ this.batchConsumer = batchConsumer; } + public Predicate getCompletionPredicate() { + return completionPredicate; + } + + public void setCompletionPredicate(Predicate completionPredicate) { + this.completionPredicate = completionPredicate; + } + public Processor getProcessor() { return processor; } @@ -198,7 +211,7 @@ protected void processExchange(Exchange exchange) throws Exception { processor.process(exchange); if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing Exchange: " + exchange, exchange.getException()); + getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, exchange.getException()); } } @@ -242,6 +255,7 @@ private Queue<Exchange> queue; private Lock queueLock = new ReentrantLock(); private boolean exchangeEnqueued; + private final Queue<String> completionPredicateMatched = new ConcurrentLinkedQueue<String>(); private Condition exchangeEnqueuedCondition = queueLock.newCondition(); public BatchSender() { @@ -278,17 +292,38 @@ do { try { if (!exchangeEnqueued) { + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting for new exchange to arrive or batchTimeout to occur after " + batchTimeout + " ms."); + } exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS); } - if (!exchangeEnqueued) { - drainQueueTo(collection, batchSize); - } else { + // if the completion predicate was triggered then there is an exchange id which denotes when to complete + String id = null; + if (!completionPredicateMatched.isEmpty()) { + id = completionPredicateMatched.poll(); + } + + if (id != null || !exchangeEnqueued) { + if (LOG.isTraceEnabled()) { + if (id != null) { + LOG.trace("Collecting exchanges to be aggregated triggered by completion predicate"); + } else { + LOG.trace("Collecting exchanges to be aggregated triggered by batch timeout"); + } + } + drainQueueTo(collection, batchSize, id); + } else { exchangeEnqueued = false; + boolean drained = false; while (isInBatchCompleted(queue.size())) { - drainQueueTo(collection, batchSize); + drained = true; + drainQueueTo(collection, batchSize, id); } - + if (drained) { + LOG.trace("Collecting exchanges to be aggregated triggered by new exchanges received"); + } + if (!isOutBatchCompleted()) { continue; } @@ -320,7 +355,7 @@ /** * This method should be called with queueLock held */ - private void drainQueueTo(Collection<Exchange> collection, int batchSize) { + private void drainQueueTo(Collection<Exchange> collection, int batchSize, String exchangeId) { for (int i = 0; i < batchSize; ++i) { Exchange e = queue.poll(); if (e != null) { @@ -331,6 +366,10 @@ } catch (Throwable t) { getExceptionHandler().handleException(t); } + if (exchangeId != null && exchangeId.equals(e.getExchangeId())) { + // this batch is complete so stop draining + break; + } } else { break; } @@ -342,8 +381,22 @@ } public void enqueueExchange(Exchange exchange) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received exchange to be batched: " + exchange); + } queueLock.lock(); try { + // pre test whether the completion predicate matched + if (completionPredicate != null) { + boolean matches = completionPredicate.matches(exchange); + if (matches) { + if (LOG.isTraceEnabled()) { + LOG.trace("Exchange matched completion predicate: " + exchange); + } + // add this exchange to the list of exchanges which marks the batch as complete + completionPredicateMatched.add(exchange.getExchangeId()); + } + } queue.add(exchange); exchangeEnqueued = true; exchangeEnqueuedCondition.signal(); @@ -359,10 +412,13 @@ Exchange exchange = iter.next(); iter.remove(); try { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending aggregated exchange: " + exchange); + } processExchange(exchange); } catch (Throwable t) { // must catch throwable to avoid growing memory - getExceptionHandler().handleException("Error processing Exchange: " + exchange, t); + getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, t); } } } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java?rev=909995&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java Sun Feb 14 09:59:12 2010 @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.BodyInAggregatingStrategy; + +/** + * @version $Revision$ + */ +public class AggregateCompletionPredicateTest extends ContextTestSupport { + + public void testCompletionPredicateBeforeTimeout() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("A+B+C+END"); + // should be faster than 10 seconds + mock.setResultWaitTime(10000); + + template.sendBodyAndHeader("direct:start", "A", "id", "foo"); + template.sendBodyAndHeader("direct:start", "B", "id", "foo"); + template.sendBodyAndHeader("direct:start", "C", "id", "foo"); + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + assertMockEndpointsSatisfied(); + } + + public void testMultipleCompletionPredicateBeforeTimeout() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("A+B+C+END", "D+E+END", "F+G+H+I+END"); + + template.sendBodyAndHeader("direct:start", "A", "id", "foo"); + template.sendBodyAndHeader("direct:start", "B", "id", "foo"); + template.sendBodyAndHeader("direct:start", "C", "id", "foo"); + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + template.sendBodyAndHeader("direct:start", "D", "id", "foo"); + template.sendBodyAndHeader("direct:start", "E", "id", "foo"); + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + template.sendBodyAndHeader("direct:start", "F", "id", "foo"); + template.sendBodyAndHeader("direct:start", "G", "id", "foo"); + template.sendBodyAndHeader("direct:start", "H", "id", "foo"); + template.sendBodyAndHeader("direct:start", "I", "id", "foo"); + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + assertMockEndpointsSatisfied(); + } + + public void testCompletionPredicateBeforeTimeoutTwoGroups() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("A+B+C+END", "1+2+3+4+END"); + // should be faster than 10 seconds + mock.setResultWaitTime(10000); + + template.sendBodyAndHeader("direct:start", "A", "id", "foo"); + template.sendBodyAndHeader("direct:start", "1", "id", "bar"); + template.sendBodyAndHeader("direct:start", "2", "id", "bar"); + template.sendBodyAndHeader("direct:start", "B", "id", "foo"); + template.sendBodyAndHeader("direct:start", "C", "id", "foo"); + template.sendBodyAndHeader("direct:start", "3", "id", "bar"); + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + template.sendBodyAndHeader("direct:start", "4", "id", "bar"); + template.sendBodyAndHeader("direct:start", "END", "id", "bar"); + + assertMockEndpointsSatisfied(); + } + + public void testMultipleCompletionPredicateBeforeTimeoutTwoGroups() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("A+B+C+END", "1+2+3+4+END", "5+6+END", "D+E+END", "7+8+END", "F+G+H+I+END"); + + template.sendBodyAndHeader("direct:start", "A", "id", "foo"); + template.sendBodyAndHeader("direct:start", "B", "id", "foo"); + template.sendBodyAndHeader("direct:start", "C", "id", "foo"); + template.sendBodyAndHeader("direct:start", "1", "id", "bar"); + template.sendBodyAndHeader("direct:start", "2", "id", "bar"); + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + template.sendBodyAndHeader("direct:start", "D", "id", "foo"); + template.sendBodyAndHeader("direct:start", "3", "id", "bar"); + template.sendBodyAndHeader("direct:start", "4", "id", "bar"); + template.sendBodyAndHeader("direct:start", "END", "id", "bar"); + + template.sendBodyAndHeader("direct:start", "5", "id", "bar"); + template.sendBodyAndHeader("direct:start", "6", "id", "bar"); + template.sendBodyAndHeader("direct:start", "E", "id", "foo"); + template.sendBodyAndHeader("direct:start", "END", "id", "bar"); + + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + template.sendBodyAndHeader("direct:start", "F", "id", "foo"); + template.sendBodyAndHeader("direct:start", "7", "id", "bar"); + template.sendBodyAndHeader("direct:start", "G", "id", "foo"); + template.sendBodyAndHeader("direct:start", "H", "id", "foo"); + template.sendBodyAndHeader("direct:start", "8", "id", "bar"); + template.sendBodyAndHeader("direct:start", "END", "id", "bar"); + + template.sendBodyAndHeader("direct:start", "I", "id", "foo"); + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(header("id"), new BodyInAggregatingStrategy()) + .completionPredicate(body().contains("END")).batchTimeout(20000) + .to("mock:aggregated"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date