Author: davsclaus Date: Mon Feb 15 15:05:11 2010 New Revision: 910231 URL: http://svn.apache.org/viewvc?rev=910231&view=rev Log: CAMEL-1686: Overhaul of aggregator. Work in progress.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (with props) camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java (with props) camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=910231&r1=910230&r2=910231&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Mon Feb 15 15:05:11 2010 @@ -32,7 +32,8 @@ public interface Exchange { String ACCEPT_CONTENT_TYPE = "CamelAcceptContentType"; - + + @Deprecated String AGGREGATED_INDEX = "CamelAggregatedIndex"; String AGGREGATED_SIZE = "CamelAggregatedSize"; Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910231&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Mon Feb 15 15:05:11 2010 @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.Navigate; +import org.apache.camel.Predicate; +import org.apache.camel.Processor; +import org.apache.camel.impl.ServiceSupport; +import org.apache.camel.util.DefaultTimeoutMap; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.TimeoutMap; +import org.apache.camel.util.TimeoutMapEntry; +import org.apache.camel.util.concurrent.ExecutorServiceHelper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * <a href="http://camel.apache.org/aggregator.html">Aggregator</a> EIP pattern. + * + * @version $Revision$ + */ +public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor> { + + private static final Log LOG = LogFactory.getLog(AggregateProcessor.class); + + private TimeoutMap<Object, Exchange> timeoutMap; + private final Processor processor; + private final AggregationStrategy aggregationStrategy; + private final Expression correlationExpression; + private ExecutorService executorService; + private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository(); + + // different ways to have completion triggered + private boolean eagerEvaluateCompletionPredicate; + private Predicate completionPredicate; + private long completionTimeout; + private int completionAggregatedSize; + + public AggregateProcessor(Processor processor, Expression correlationExpression, AggregationStrategy aggregationStrategy) { + ObjectHelper.notNull(processor, "processor"); + ObjectHelper.notNull(correlationExpression, "correlationExpression"); + ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy"); + this.processor = processor; + this.correlationExpression = correlationExpression; + this.aggregationStrategy = aggregationStrategy; + } + + @Override + public String toString() { + return "AggregateProcessor[to: " + processor + "]"; + } + + public List<Processor> next() { + if (!hasNext()) { + return null; + } + List<Processor> answer = new ArrayList<Processor>(1); + answer.add(processor); + return answer; + } + + public boolean hasNext() { + return processor != null; + } + + public void process(Exchange exchange) throws Exception { + // compute correlation expression + Object key = correlationExpression.evaluate(exchange, Object.class); + if (ObjectHelper.isEmpty(key)) { + throw new CamelExchangeException("Correlation key could not be evaluated to a value", exchange); + } + + Exchange oldExchange = aggregationRepository.get(key); + Exchange newExchange = exchange; + + Integer size = 1; + if (oldExchange != null) { + size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class); + ObjectHelper.notNull(size, Exchange.AGGREGATED_SIZE + " on " + oldExchange); + size++; + } + + // are we complete? + boolean complete = false; + if (isEagerEvaluateCompletionPredicate()) { + complete = isCompleted(key, exchange, size); + } + + // prepare the exchanges for aggregation and aggregate it + ExchangeHelper.prepareAggregation(oldExchange, newExchange); + newExchange = onAggregation(oldExchange, newExchange); + newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); + + // if not set to evaluate eager then do that after the aggregation + if (!isEagerEvaluateCompletionPredicate()) { + complete = isCompleted(key, newExchange, size); + } + + // only need to update aggregation repository if we are not complete + if (!complete && !newExchange.equals(oldExchange)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Put exchange:" + newExchange + " with correlation key:" + key); + } + aggregationRepository.add(key, newExchange); + } + + if (complete) { + onCompletion(key, newExchange); + } + } + + protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) { + return aggregationStrategy.aggregate(oldExchange, newExchange); + } + + protected boolean isCompleted(Object key, Exchange exchange, int size) { + if (getCompletionPredicate() != null) { + boolean answer = getCompletionPredicate().matches(exchange); + if (answer) { + return true; + } + } + + if (getCompletionAggregatedSize() > 0) { + if (size >= getCompletionAggregatedSize()) { + return true; + } + } + + if (getCompletionTimeout() > 0) { + // timeout is used so use the timeout map to keep an eye on this + if (LOG.isDebugEnabled()) { + LOG.debug("Updating correlation key " + key + " to timeout after " + getCompletionTimeout() + " ms."); + } + timeoutMap.put(key, exchange, getCompletionTimeout()); + } + + return false; + } + + protected void onCompletion(Object key, final Exchange exchange) { + // remove from repository and timeout map as its completed + aggregationRepository.remove(key); + if (timeoutMap != null) { + timeoutMap.remove(key); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Aggregation complete for correlation key " + key + " sending aggregated exchange: " + exchange); + } + + // send this exchange + executorService.submit(new Runnable() { + public void run() { + try { + processor.process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + } + }); + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public Predicate getCompletionPredicate() { + return completionPredicate; + } + + public void setCompletionPredicate(Predicate completionPredicate) { + this.completionPredicate = completionPredicate; + } + + public boolean isEagerEvaluateCompletionPredicate() { + return eagerEvaluateCompletionPredicate; + } + + public void setEagerEvaluateCompletionPredicate(boolean eagerEvaluateCompletionPredicate) { + this.eagerEvaluateCompletionPredicate = eagerEvaluateCompletionPredicate; + } + + public long getCompletionTimeout() { + return completionTimeout; + } + + public void setCompletionTimeout(long completionTimeout) { + this.completionTimeout = completionTimeout; + } + + public int getCompletionAggregatedSize() { + return completionAggregatedSize; + } + + public void setCompletionAggregatedSize(int completionAggregatedSize) { + this.completionAggregatedSize = completionAggregatedSize; + } + + /** + * Background tasks that looks for aggregated exchanges which is triggered by completion timeouts. + */ + private class TimeoutReaper extends DefaultTimeoutMap<Object, Exchange> { + + private TimeoutReaper(ScheduledExecutorService executor, long requestMapPollTimeMillis) { + super(executor, requestMapPollTimeMillis); + } + + protected boolean isValidForEviction(TimeoutMapEntry<Object, Exchange> entry) { + if (log.isDebugEnabled()) { + log.debug("Completion timeout triggered for correlation key: " + entry.getKey()); + } + onCompletion(entry.getKey(), entry.getValue()); + return true; + } + } + + @Override + protected void doStart() throws Exception { + if (getCompletionTimeout() <= 0 && getCompletionAggregatedSize() <= 0 && getCompletionPredicate() == null) { + throw new IllegalStateException("At least one of the completions options" + + " [completionTimeout, completionAggregatedSize, completionPredicate] must be set"); + } + + ServiceHelper.startService(aggregationRepository); + + if (executorService == null) { + executorService = ExecutorServiceHelper.newFixedThreadPool(10, "AggregateProcessor", true); + } + + // start timeout service if its in use + if (getCompletionTimeout() > 0) { + ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregationProcessorTimeoutReaper", true); + timeoutMap = new TimeoutReaper(scheduler, 1000L); + ServiceHelper.startService(timeoutMap); + } + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(timeoutMap); + + if (executorService != null) { + executorService.shutdown(); + executorService = null; + } + + ServiceHelper.stopService(aggregationRepository); + } + +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java?rev=910231&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java Mon Feb 15 15:05:11 2010 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import org.apache.camel.Exchange; + +/** + * Access to a repository to store aggregated exchanges to support pluggable implementations. + * + * @version $Revision$ + */ +public interface AggregationRepository<K> { + + /** + * Add the given {...@link Exchange} under the correlation key. + * <p/> + * Will replace any existing exchange. + * + * @param key the correlation key + * @param exchange the aggregated exchange + * @return the old exchange if any existed + */ + Exchange add(K key, Exchange exchange); + + /** + * Gets the given exchange with the correlation key + * + * @param key the correlation key + * @return the exchange, or <tt>null</tt> if no exchange was previously added + */ + Exchange get(K key); + + /** + * Removes the exchange with the given correlation key + * + * @param key the correlation key + */ + void remove(K key); + +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java?rev=910231&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java Mon Feb 15 15:05:11 2010 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.ServiceSupport; + +/** + * A memory based {...@link org.apache.camel.processor.aggregate.AggregationRepository} which stores in memory only. + * + * @version $Revision$ + */ +public class MemoryAggregationRepository extends ServiceSupport implements AggregationRepository<Object> { + + private final Map<Object, Exchange> cache = new ConcurrentHashMap<Object, Exchange>(); + + public Exchange add(Object key, Exchange exchange) { + return cache.put(key, exchange); + } + + public Exchange get(Object key) { + return cache.get(key); + } + + public void remove(Object key) { + cache.remove(key); + } + + @Override + protected void doStart() throws Exception { + } + + @Override + protected void doStop() throws Exception { + cache.clear(); + } + +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?rev=910231&r1=910230&r2=910231&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java Mon Feb 15 15:05:11 2010 @@ -53,7 +53,7 @@ boolean remove(E key); /** - * Confirms the key, after the exchange has been processed sucesfully. + * Confirms the key, after the exchange has been processed successfully. * * @param key the key of the message for duplicate test * @return <tt>true</tt> if the key was confirmed Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=910231&r1=910230&r2=910231&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Mon Feb 15 15:05:11 2010 @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.camel.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,14 +34,15 @@ * * @version $Revision$ */ -public class DefaultTimeoutMap implements TimeoutMap, Runnable { +public class DefaultTimeoutMap<K, V> implements TimeoutMap<K, V>, Runnable, Service { - private static final transient Log LOG = LogFactory.getLog(DefaultTimeoutMap.class); + protected final transient Log log = LogFactory.getLog(getClass()); - private final Map<Object, Object> map = new HashMap<Object, Object>(); - private SortedSet<TimeoutMapEntry> index = new TreeSet<TimeoutMapEntry>(); - private ScheduledExecutorService executor; - private long purgePollTime; + private final Map<K, TimeoutMapEntry<K, V>> map = new HashMap<K, TimeoutMapEntry<K, V>>(); + private final SortedSet<TimeoutMapEntry<K, V>> index = new TreeSet<TimeoutMapEntry<K, V>>(); + private final ScheduledExecutorService executor; + private final long purgePollTime; + private final long initialDelay = 1000L; public DefaultTimeoutMap() { this(null, 1000L); @@ -52,10 +54,10 @@ schedulePoll(); } - public Object get(Object key) { - TimeoutMapEntry entry = null; + public V get(K key) { + TimeoutMapEntry<K, V> entry; synchronized (map) { - entry = (TimeoutMapEntry) map.get(key); + entry = map.get(key); if (entry == null) { return null; } @@ -66,10 +68,10 @@ return entry.getValue(); } - public void put(Object key, Object value, long timeoutMillis) { - TimeoutMapEntry entry = new TimeoutMapEntry(key, value, timeoutMillis); + public void put(K key, V value, long timeoutMillis) { + TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis); synchronized (map) { - Object oldValue = map.put(key, entry); + TimeoutMapEntry<K, V> oldValue = map.put(key, entry); if (oldValue != null) { index.remove(oldValue); } @@ -78,9 +80,9 @@ } } - public void remove(Object id) { + public void remove(K id) { synchronized (map) { - TimeoutMapEntry entry = (TimeoutMapEntry) map.remove(id); + TimeoutMapEntry entry = map.remove(id); if (entry != null) { index.remove(entry); } @@ -90,7 +92,7 @@ public Object[] getKeys() { Object[] keys = null; synchronized (map) { - Set<Object> keySet = map.keySet(); + Set<K> keySet = map.keySet(); keys = new Object[keySet.size()]; keySet.toArray(keys); } @@ -108,21 +110,20 @@ */ public void run() { purge(); - schedulePoll(); } public void purge() { long now = currentTime(); synchronized (map) { - for (Iterator<TimeoutMapEntry> iter = index.iterator(); iter.hasNext();) { - TimeoutMapEntry entry = (TimeoutMapEntry) iter.next(); + for (Iterator<TimeoutMapEntry<K, V>> iter = index.iterator(); iter.hasNext();) { + TimeoutMapEntry<K, V> entry = iter.next(); if (entry == null) { break; } if (entry.getExpireTime() < now) { if (isValidForEviction(entry)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Evicting inactive request for correlationID: " + entry); + if (log.isDebugEnabled()) { + log.debug("Evicting inactive request for correlationID: " + entry); } map.remove(entry.getKey()); iter.remove(); @@ -153,14 +154,14 @@ */ protected void schedulePoll() { if (executor != null) { - executor.schedule(this, purgePollTime, TimeUnit.MILLISECONDS); + executor.scheduleWithFixedDelay(this, initialDelay, purgePollTime, TimeUnit.MILLISECONDS); } } /** * A hook to allow derivations to avoid evicting the current entry */ - protected boolean isValidForEviction(TimeoutMapEntry entry) { + protected boolean isValidForEviction(TimeoutMapEntry<K, V> entry) { return true; } @@ -172,4 +173,15 @@ protected long currentTime() { return System.currentTimeMillis(); } + + public void start() throws Exception { + } + + public void stop() throws Exception { + if (executor != null) { + executor.shutdown(); + } + map.clear(); + index.clear(); + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java?rev=910231&r1=910230&r2=910231&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java Mon Feb 15 15:05:11 2010 @@ -22,7 +22,7 @@ * * @version $Revision$ */ -public interface TimeoutMap extends Runnable { +public interface TimeoutMap<K, V> extends Runnable { /** * Looks up the value in the map by the given key. @@ -30,7 +30,7 @@ * @param key the key of the value to search for * @return the value for the given key or null if it is not present (or has timed out) */ - Object get(Object key); + V get(K key); /** * Returns a copy of the keys in the map @@ -46,14 +46,14 @@ * Adds the key value pair into the map such that some time after the given * timeout the entry will be evicted */ - void put(Object key, Object value, long timeoutMillis); + void put(K key, V value, long timeoutMillis); /** * Removes the object with the given key * * @param key key for the object to remove */ - void remove(Object key); + void remove(K key); /** * Purges any old entries from the map Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java?rev=910231&r1=910230&r2=910231&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java Mon Feb 15 15:05:11 2010 @@ -23,19 +23,19 @@ * * @version $Revision$ */ -public class TimeoutMapEntry implements Comparable<Object>, Map.Entry<Object, Object> { - private Object key; - private Object value; +public class TimeoutMapEntry<K, V> implements Comparable<Object>, Map.Entry<K, V> { + private K key; + private V value; private long timeout; private long expireTime; - public TimeoutMapEntry(Object id, Object handler, long timeout) { + public TimeoutMapEntry(K id, V handler, long timeout) { this.key = id; this.value = handler; this.timeout = timeout; } - public Object getKey() { + public K getKey() { return key; } @@ -47,12 +47,12 @@ this.expireTime = expireTime; } - public Object getValue() { + public V getValue() { return value; } - public Object setValue(Object value) { - Object oldValue = this.value; + public V setValue(V value) { + V oldValue = this.value; this.value = value; return oldValue; } @@ -65,6 +65,7 @@ this.timeout = timeout; } + @SuppressWarnings("unchecked") public int compareTo(Object that) { if (this == that) { return 0; @@ -75,7 +76,7 @@ return 1; } - public int compareTo(TimeoutMapEntry that) { + public int compareTo(TimeoutMapEntry<K, V> that) { long diff = this.expireTime - that.expireTime; if (diff > 0) { return 1; Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910231&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Mon Feb 15 15:05:11 2010 @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.Predicate; +import org.apache.camel.Processor; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.processor.BodyInAggregatingStrategy; +import org.apache.camel.processor.SendProcessor; +import org.apache.camel.processor.aggregate.AggregateProcessor; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * @version $Revision$ + */ +public class AggregateProcessorTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testAggregateProcessorCompletionPredicate() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("A+B+END"); + + Processor done = new SendProcessor(context.getEndpoint("mock:result")); + Expression corr = header("id"); + AggregationStrategy as = new BodyInAggregatingStrategy(); + Predicate complete = body().contains("END"); + + AggregateProcessor ap = new AggregateProcessor(done, corr, as); + ap.setCompletionPredicate(complete); + ap.setEagerEvaluateCompletionPredicate(false); + ap.start(); + + Exchange e1 = new DefaultExchange(context); + e1.getIn().setBody("A"); + e1.getIn().setHeader("id", 123); + + Exchange e2 = new DefaultExchange(context); + e2.getIn().setBody("B"); + e2.getIn().setHeader("id", 123); + + Exchange e3 = new DefaultExchange(context); + e3.getIn().setBody("END"); + e3.getIn().setHeader("id", 123); + + Exchange e4 = new DefaultExchange(context); + e4.getIn().setBody("D"); + e4.getIn().setHeader("id", 123); + + ap.process(e1); + ap.process(e2); + ap.process(e3); + ap.process(e4); + + assertMockEndpointsSatisfied(); + + ap.stop(); + } + + public void testAggregateProcessorCompletionPredicateEager() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("A+B+END"); + + Processor done = new SendProcessor(context.getEndpoint("mock:result")); + Expression corr = header("id"); + AggregationStrategy as = new BodyInAggregatingStrategy(); + Predicate complete = body().isEqualTo("END"); + + AggregateProcessor ap = new AggregateProcessor(done, corr, as); + ap.setCompletionPredicate(complete); + ap.setEagerEvaluateCompletionPredicate(true); + ap.start(); + + Exchange e1 = new DefaultExchange(context); + e1.getIn().setBody("A"); + e1.getIn().setHeader("id", 123); + + Exchange e2 = new DefaultExchange(context); + e2.getIn().setBody("B"); + e2.getIn().setHeader("id", 123); + + Exchange e3 = new DefaultExchange(context); + e3.getIn().setBody("END"); + e3.getIn().setHeader("id", 123); + + Exchange e4 = new DefaultExchange(context); + e4.getIn().setBody("D"); + e4.getIn().setHeader("id", 123); + + ap.process(e1); + ap.process(e2); + ap.process(e3); + ap.process(e4); + + assertMockEndpointsSatisfied(); + + ap.stop(); + } + + public void testAggregateProcessorCompletionAggregatedSize() throws Exception { + doTestAggregateProcessorCompletionAggregatedSize(false); + } + + public void testAggregateProcessorCompletionAggregatedSizeEager() throws Exception { + doTestAggregateProcessorCompletionAggregatedSize(true); + } + + private void doTestAggregateProcessorCompletionAggregatedSize(boolean eager) throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("A+B+C"); + + Processor done = new SendProcessor(context.getEndpoint("mock:result")); + Expression corr = header("id"); + AggregationStrategy as = new BodyInAggregatingStrategy(); + + AggregateProcessor ap = new AggregateProcessor(done, corr, as); + ap.setCompletionAggregatedSize(3); + ap.setEagerEvaluateCompletionPredicate(eager); + ap.start(); + + Exchange e1 = new DefaultExchange(context); + e1.getIn().setBody("A"); + e1.getIn().setHeader("id", 123); + + Exchange e2 = new DefaultExchange(context); + e2.getIn().setBody("B"); + e2.getIn().setHeader("id", 123); + + Exchange e3 = new DefaultExchange(context); + e3.getIn().setBody("C"); + e3.getIn().setHeader("id", 123); + + Exchange e4 = new DefaultExchange(context); + e4.getIn().setBody("D"); + e4.getIn().setHeader("id", 123); + + ap.process(e1); + ap.process(e2); + ap.process(e3); + ap.process(e4); + + assertMockEndpointsSatisfied(); + + ap.stop(); + } + + public void testAggregateProcessorCompletionTimeout() throws Exception { + doTestAggregateProcessorCompletionTimeout(false); + } + + public void testAggregateProcessorCompletionTimeoutEager() throws Exception { + doTestAggregateProcessorCompletionTimeout(true); + } + + private void doTestAggregateProcessorCompletionTimeout(boolean eager) throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("A+B+C"); + + Processor done = new SendProcessor(context.getEndpoint("mock:result")); + Expression corr = header("id"); + AggregationStrategy as = new BodyInAggregatingStrategy(); + + AggregateProcessor ap = new AggregateProcessor(done, corr, as); + ap.setCompletionTimeout(3000); + ap.setEagerEvaluateCompletionPredicate(eager); + ap.start(); + + Exchange e1 = new DefaultExchange(context); + e1.getIn().setBody("A"); + e1.getIn().setHeader("id", 123); + + Exchange e2 = new DefaultExchange(context); + e2.getIn().setBody("B"); + e2.getIn().setHeader("id", 123); + + Exchange e3 = new DefaultExchange(context); + e3.getIn().setBody("C"); + e3.getIn().setHeader("id", 123); + + Exchange e4 = new DefaultExchange(context); + e4.getIn().setBody("D"); + e4.getIn().setHeader("id", 123); + + ap.process(e1); + + Thread.sleep(250); + ap.process(e2); + + Thread.sleep(500); + ap.process(e3); + + Thread.sleep(5000); + ap.process(e4); + + assertMockEndpointsSatisfied(); + + ap.stop(); + } + +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date