Author: davsclaus
Date: Mon Feb 15 15:05:11 2010
New Revision: 910231

URL: http://svn.apache.org/viewvc?rev=910231&view=rev
Log:
CAMEL-1686: Overhaul of aggregator. Work in progress.

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
   (with props)
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java
   (with props)
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=910231&r1=910230&r2=910231&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Mon Feb 
15 15:05:11 2010
@@ -32,7 +32,8 @@
 public interface Exchange {
 
     String ACCEPT_CONTENT_TYPE = "CamelAcceptContentType";
-    
+
+    @Deprecated
     String AGGREGATED_INDEX = "CamelAggregatedIndex";
     String AGGREGATED_SIZE  = "CamelAggregatedSize";
 

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910231&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Mon Feb 15 15:05:11 2010
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Navigate;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.DefaultTimeoutMap;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.TimeoutMap;
+import org.apache.camel.util.TimeoutMapEntry;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * <a href="http://camel.apache.org/aggregator.html";>Aggregator</a> EIP 
pattern.
+ *
+ * @version $Revision$
+ */
+public class AggregateProcessor extends ServiceSupport implements Processor, 
Navigate<Processor> {
+
+    private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
+
+    private TimeoutMap<Object, Exchange> timeoutMap;
+    private final Processor processor;
+    private final AggregationStrategy aggregationStrategy;
+    private final Expression correlationExpression;
+    private ExecutorService executorService;
+    private AggregationRepository<Object> aggregationRepository = new 
MemoryAggregationRepository();
+
+    // different ways to have completion triggered
+    private boolean eagerEvaluateCompletionPredicate;
+    private Predicate completionPredicate;
+    private long completionTimeout;
+    private int completionAggregatedSize;
+
+    public AggregateProcessor(Processor processor, Expression 
correlationExpression, AggregationStrategy aggregationStrategy) {
+        ObjectHelper.notNull(processor, "processor");
+        ObjectHelper.notNull(correlationExpression, "correlationExpression");
+        ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
+        this.processor = processor;
+        this.correlationExpression = correlationExpression;
+        this.aggregationStrategy = aggregationStrategy;
+    }
+
+    @Override
+    public String toString() {
+        return "AggregateProcessor[to: " + processor + "]";
+    }
+
+    public List<Processor> next() {
+        if (!hasNext()) {
+            return null;
+        }
+        List<Processor> answer = new ArrayList<Processor>(1);
+        answer.add(processor);
+        return answer;
+    }
+
+    public boolean hasNext() {
+        return processor != null;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        // compute correlation expression
+        Object key = correlationExpression.evaluate(exchange, Object.class);
+        if (ObjectHelper.isEmpty(key)) {
+            throw new CamelExchangeException("Correlation key could not be 
evaluated to a value", exchange);
+        }
+
+        Exchange oldExchange = aggregationRepository.get(key);
+        Exchange newExchange = exchange;
+
+        Integer size = 1;
+        if (oldExchange != null) {
+            size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 
Integer.class);
+            ObjectHelper.notNull(size, Exchange.AGGREGATED_SIZE + " on " + 
oldExchange);
+            size++;
+        }
+
+        // are we complete?
+        boolean complete = false;
+        if (isEagerEvaluateCompletionPredicate()) {
+            complete = isCompleted(key, exchange, size);
+        }
+
+        // prepare the exchanges for aggregation and aggregate it
+        ExchangeHelper.prepareAggregation(oldExchange, newExchange);
+        newExchange = onAggregation(oldExchange, newExchange);
+        newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+
+        // if not set to evaluate eager then do that after the aggregation
+        if (!isEagerEvaluateCompletionPredicate()) {
+            complete = isCompleted(key, newExchange, size);
+        }
+
+        // only need to update aggregation repository if we are not complete
+        if (!complete && !newExchange.equals(oldExchange)) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Put exchange:" + newExchange + " with correlation 
key:"  + key);
+            }
+            aggregationRepository.add(key, newExchange);
+        }
+
+        if (complete) {
+            onCompletion(key, newExchange);
+        }
+    }
+
+    protected Exchange onAggregation(Exchange oldExchange, Exchange 
newExchange) {
+        return aggregationStrategy.aggregate(oldExchange, newExchange);
+    }
+
+    protected boolean isCompleted(Object key, Exchange exchange, int size) {
+        if (getCompletionPredicate() != null) {
+            boolean answer = getCompletionPredicate().matches(exchange);
+            if (answer) {
+                return true;
+            }
+        }
+
+        if (getCompletionAggregatedSize() > 0) {
+            if (size >= getCompletionAggregatedSize()) {
+                return true;
+            }
+        }
+
+        if (getCompletionTimeout() > 0) {
+            // timeout is used so use the timeout map to keep an eye on this
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Updating correlation key " + key + " to timeout 
after " + getCompletionTimeout() + " ms.");
+            }
+            timeoutMap.put(key, exchange, getCompletionTimeout());
+        }
+
+        return false;
+    }
+
+    protected void onCompletion(Object key, final Exchange exchange) {
+        // remove from repository and timeout map as its completed
+        aggregationRepository.remove(key);
+        if (timeoutMap != null) {
+            timeoutMap.remove(key);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Aggregation complete for correlation key " + key + " 
sending aggregated exchange: " + exchange);
+        }
+
+        // send this exchange
+        executorService.submit(new Runnable() {
+            public void run() {
+                try {
+                    processor.process(exchange);
+                } catch (Exception e) {
+                    exchange.setException(e);
+                }
+            }
+        });
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public Predicate getCompletionPredicate() {
+        return completionPredicate;
+    }
+
+    public void setCompletionPredicate(Predicate completionPredicate) {
+        this.completionPredicate = completionPredicate;
+    }
+
+    public boolean isEagerEvaluateCompletionPredicate() {
+        return eagerEvaluateCompletionPredicate;
+    }
+
+    public void setEagerEvaluateCompletionPredicate(boolean 
eagerEvaluateCompletionPredicate) {
+        this.eagerEvaluateCompletionPredicate = 
eagerEvaluateCompletionPredicate;
+    }
+
+    public long getCompletionTimeout() {
+        return completionTimeout;
+    }
+
+    public void setCompletionTimeout(long completionTimeout) {
+        this.completionTimeout = completionTimeout;
+    }
+
+    public int getCompletionAggregatedSize() {
+        return completionAggregatedSize;
+    }
+
+    public void setCompletionAggregatedSize(int completionAggregatedSize) {
+        this.completionAggregatedSize = completionAggregatedSize;
+    }
+
+    /**
+     * Background tasks that looks for aggregated exchanges which is triggered 
by completion timeouts.
+     */
+    private class TimeoutReaper extends DefaultTimeoutMap<Object, Exchange> {
+
+        private TimeoutReaper(ScheduledExecutorService executor, long 
requestMapPollTimeMillis) {
+            super(executor, requestMapPollTimeMillis);
+        }
+
+        protected boolean isValidForEviction(TimeoutMapEntry<Object, Exchange> 
entry) {
+            if (log.isDebugEnabled()) {
+                log.debug("Completion timeout triggered for correlation key: " 
+ entry.getKey());
+            }
+            onCompletion(entry.getKey(), entry.getValue());
+            return true;
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (getCompletionTimeout() <= 0 && getCompletionAggregatedSize() <= 0 
&& getCompletionPredicate() == null) {
+            throw new IllegalStateException("At least one of the completions 
options"
+                    + " [completionTimeout, completionAggregatedSize, 
completionPredicate] must be set");
+        }
+
+        ServiceHelper.startService(aggregationRepository);
+
+        if (executorService == null) {
+            executorService = ExecutorServiceHelper.newFixedThreadPool(10, 
"AggregateProcessor", true);
+        }
+
+        // start timeout service if its in use
+        if (getCompletionTimeout() > 0) {
+            ScheduledExecutorService scheduler = 
ExecutorServiceHelper.newScheduledThreadPool(1, 
"AggregationProcessorTimeoutReaper", true);
+            timeoutMap = new TimeoutReaper(scheduler, 1000L);
+            ServiceHelper.startService(timeoutMap);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(timeoutMap);
+
+        if (executorService != null) {
+            executorService.shutdown();
+            executorService = null;
+        }
+
+        ServiceHelper.stopService(aggregationRepository);
+    }
+
+}

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java?rev=910231&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java
 Mon Feb 15 15:05:11 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Access to a repository to store aggregated exchanges to support pluggable 
implementations.
+ *  
+ * @version $Revision$
+ */
+public interface AggregationRepository<K> {
+
+    /**
+     * Add the given {...@link Exchange} under the correlation key.
+     * <p/>
+     * Will replace any existing exchange.
+     *
+     * @param key  the correlation key
+     * @param exchange the aggregated exchange
+     * @return the old exchange if any existed
+     */
+    Exchange add(K key, Exchange exchange);
+
+    /**
+     * Gets the given exchange with the correlation key
+     *
+     * @param key the correlation key
+     * @return the exchange, or <tt>null</tt> if no exchange was previously 
added
+     */
+    Exchange get(K key);
+
+    /**
+     * Removes the exchange with the given correlation key
+     *
+     * @param key the correlation key
+     */
+    void remove(K key);
+
+}

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationRepository.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java?rev=910231&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
 Mon Feb 15 15:05:11 2010
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.ServiceSupport;
+
+/**
+ * A memory based {...@link 
org.apache.camel.processor.aggregate.AggregationRepository} which stores in 
memory only.
+ *
+ * @version $Revision$
+ */
+public class MemoryAggregationRepository extends ServiceSupport implements 
AggregationRepository<Object> {
+
+    private final Map<Object, Exchange> cache = new ConcurrentHashMap<Object, 
Exchange>();
+
+    public Exchange add(Object key, Exchange exchange) {
+        return cache.put(key, exchange);
+    }
+
+    public Exchange get(Object key) {
+        return cache.get(key);
+    }
+
+    public void remove(Object key) {
+        cache.remove(key);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        cache.clear();
+    }
+
+}

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?rev=910231&r1=910230&r2=910231&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
 Mon Feb 15 15:05:11 2010
@@ -53,7 +53,7 @@
     boolean remove(E key);
 
     /**
-     * Confirms the key, after the exchange has been processed sucesfully.
+     * Confirms the key, after the exchange has been processed successfully.
      *
      * @param key the key of the message for duplicate test
      * @return <tt>true</tt> if the key was confirmed

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=910231&r1=910230&r2=910231&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
 Mon Feb 15 15:05:11 2010
@@ -25,6 +25,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -33,14 +34,15 @@
  *
  * @version $Revision$
  */
-public class DefaultTimeoutMap implements TimeoutMap, Runnable {
+public class DefaultTimeoutMap<K, V> implements TimeoutMap<K, V>, Runnable, 
Service {
 
-    private static final transient Log LOG = 
LogFactory.getLog(DefaultTimeoutMap.class);
+    protected final transient Log log = LogFactory.getLog(getClass());
 
-    private final Map<Object, Object> map = new HashMap<Object, Object>();
-    private SortedSet<TimeoutMapEntry> index = new TreeSet<TimeoutMapEntry>();
-    private ScheduledExecutorService executor;
-    private long purgePollTime;
+    private final Map<K, TimeoutMapEntry<K, V>> map = new HashMap<K, 
TimeoutMapEntry<K, V>>();
+    private final SortedSet<TimeoutMapEntry<K, V>> index = new 
TreeSet<TimeoutMapEntry<K, V>>();
+    private final ScheduledExecutorService executor;
+    private final long purgePollTime;
+    private final long initialDelay = 1000L;
 
     public DefaultTimeoutMap() {
         this(null, 1000L);
@@ -52,10 +54,10 @@
         schedulePoll();
     }
 
-    public Object get(Object key) {
-        TimeoutMapEntry entry = null;
+    public V get(K key) {
+        TimeoutMapEntry<K, V> entry;
         synchronized (map) {
-            entry = (TimeoutMapEntry) map.get(key);
+            entry = map.get(key);
             if (entry == null) {
                 return null;
             }
@@ -66,10 +68,10 @@
         return entry.getValue();
     }
 
-    public void put(Object key, Object value, long timeoutMillis) {
-        TimeoutMapEntry entry = new TimeoutMapEntry(key, value, timeoutMillis);
+    public void put(K key, V value, long timeoutMillis) {
+        TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, 
timeoutMillis);
         synchronized (map) {
-            Object oldValue = map.put(key, entry);
+            TimeoutMapEntry<K, V> oldValue = map.put(key, entry);
             if (oldValue != null) {
                 index.remove(oldValue);
             }
@@ -78,9 +80,9 @@
         }
     }
 
-    public void remove(Object id) {
+    public void remove(K id) {
         synchronized (map) {
-            TimeoutMapEntry entry = (TimeoutMapEntry) map.remove(id);
+            TimeoutMapEntry entry = map.remove(id);
             if (entry != null) {
                 index.remove(entry);
             }
@@ -90,7 +92,7 @@
     public Object[] getKeys() {
         Object[] keys = null;
         synchronized (map) {
-            Set<Object> keySet = map.keySet();
+            Set<K> keySet = map.keySet();
             keys = new Object[keySet.size()];
             keySet.toArray(keys);
         }
@@ -108,21 +110,20 @@
      */
     public void run() {
         purge();
-        schedulePoll();
     }
 
     public void purge() {
         long now = currentTime();
         synchronized (map) {
-            for (Iterator<TimeoutMapEntry> iter = index.iterator(); 
iter.hasNext();) {
-                TimeoutMapEntry entry = (TimeoutMapEntry) iter.next();
+            for (Iterator<TimeoutMapEntry<K, V>> iter = index.iterator(); 
iter.hasNext();) {
+                TimeoutMapEntry<K, V> entry = iter.next();
                 if (entry == null) {
                     break;
                 }
                 if (entry.getExpireTime() < now) {
                     if (isValidForEviction(entry)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Evicting inactive request for 
correlationID: " + entry);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Evicting inactive request for 
correlationID: " + entry);
                         }
                         map.remove(entry.getKey());
                         iter.remove();
@@ -153,14 +154,14 @@
      */
     protected void schedulePoll() {
         if (executor != null) {
-            executor.schedule(this, purgePollTime, TimeUnit.MILLISECONDS);
+            executor.scheduleWithFixedDelay(this, initialDelay, purgePollTime, 
TimeUnit.MILLISECONDS);
         }
     }
 
     /**
      * A hook to allow derivations to avoid evicting the current entry
      */
-    protected boolean isValidForEviction(TimeoutMapEntry entry) {
+    protected boolean isValidForEviction(TimeoutMapEntry<K, V> entry) {
         return true;
     }
 
@@ -172,4 +173,15 @@
     protected long currentTime() {
         return System.currentTimeMillis();
     }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+        if (executor != null) {
+            executor.shutdown();
+        }
+        map.clear();
+        index.clear();
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java?rev=910231&r1=910230&r2=910231&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java 
Mon Feb 15 15:05:11 2010
@@ -22,7 +22,7 @@
  *
  * @version $Revision$
  */
-public interface TimeoutMap extends Runnable {
+public interface TimeoutMap<K, V> extends Runnable {
 
     /**
      * Looks up the value in the map by the given key.
@@ -30,7 +30,7 @@
      * @param key the key of the value to search for
      * @return the value for the given key or null if it is not present (or 
has timed out)
      */
-    Object get(Object key);
+    V get(K key);
 
     /**
      * Returns a copy of the keys in the map
@@ -46,14 +46,14 @@
      * Adds the key value pair into the map such that some time after the given
      * timeout the entry will be evicted
      */
-    void put(Object key, Object value, long timeoutMillis);
+    void put(K key, V value, long timeoutMillis);
 
     /**
      * Removes the object with the given key
      *
      * @param key  key for the object to remove
      */
-    void remove(Object key);
+    void remove(K key);
 
     /**
      * Purges any old entries from the map

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java?rev=910231&r1=910230&r2=910231&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java 
Mon Feb 15 15:05:11 2010
@@ -23,19 +23,19 @@
  *
  * @version $Revision$
  */
-public class TimeoutMapEntry implements Comparable<Object>, Map.Entry<Object, 
Object> {
-    private Object key;
-    private Object value;
+public class TimeoutMapEntry<K, V> implements Comparable<Object>, Map.Entry<K, 
V> {
+    private K key;
+    private V value;
     private long timeout;
     private long expireTime;
 
-    public TimeoutMapEntry(Object id, Object handler, long timeout) {
+    public TimeoutMapEntry(K id, V handler, long timeout) {
         this.key = id;
         this.value = handler;
         this.timeout = timeout;
     }
 
-    public Object getKey() {
+    public K getKey() {
         return key;
     }
 
@@ -47,12 +47,12 @@
         this.expireTime = expireTime;
     }
 
-    public Object getValue() {
+    public V getValue() {
         return value;
     }
 
-    public Object setValue(Object value) {
-        Object oldValue = this.value;
+    public V setValue(V value) {
+        V oldValue = this.value;
         this.value = value;
         return oldValue;
     }
@@ -65,6 +65,7 @@
         this.timeout = timeout;
     }
 
+    @SuppressWarnings("unchecked")
     public int compareTo(Object that) {
         if (this == that) {
             return 0;
@@ -75,7 +76,7 @@
         return 1;
     }
 
-    public int compareTo(TimeoutMapEntry that) {
+    public int compareTo(TimeoutMapEntry<K, V> that) {
         long diff = this.expireTime - that.expireTime;
         if (diff > 0) {
             return 1;

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910231&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
 Mon Feb 15 15:05:11 2010
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Predicate;
+import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.SendProcessor;
+import org.apache.camel.processor.aggregate.AggregateProcessor;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class AggregateProcessorTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testAggregateProcessorCompletionPredicate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+B+END");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+        Predicate complete = body().contains("END");
+
+        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        ap.setCompletionPredicate(complete);
+        ap.setEagerEvaluateCompletionPredicate(false);
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+        e2.getIn().setHeader("id", 123);
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("END");
+        e3.getIn().setHeader("id", 123);
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("D");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+        ap.process(e2);
+        ap.process(e3);
+        ap.process(e4);
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
+
+    public void testAggregateProcessorCompletionPredicateEager() throws 
Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+B+END");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+        Predicate complete = body().isEqualTo("END");
+
+        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        ap.setCompletionPredicate(complete);
+        ap.setEagerEvaluateCompletionPredicate(true);
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+        e2.getIn().setHeader("id", 123);
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("END");
+        e3.getIn().setHeader("id", 123);
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("D");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+        ap.process(e2);
+        ap.process(e3);
+        ap.process(e4);
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
+
+    public void testAggregateProcessorCompletionAggregatedSize() throws 
Exception {
+        doTestAggregateProcessorCompletionAggregatedSize(false);
+    }
+
+    public void testAggregateProcessorCompletionAggregatedSizeEager() throws 
Exception {
+        doTestAggregateProcessorCompletionAggregatedSize(true);
+    }
+
+    private void doTestAggregateProcessorCompletionAggregatedSize(boolean 
eager) throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+B+C");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+
+        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        ap.setCompletionAggregatedSize(3);
+        ap.setEagerEvaluateCompletionPredicate(eager);
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+        e2.getIn().setHeader("id", 123);
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("C");
+        e3.getIn().setHeader("id", 123);
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("D");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+        ap.process(e2);
+        ap.process(e3);
+        ap.process(e4);
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
+
+    public void testAggregateProcessorCompletionTimeout() throws Exception {
+        doTestAggregateProcessorCompletionTimeout(false);
+    }
+
+    public void testAggregateProcessorCompletionTimeoutEager() throws 
Exception {
+        doTestAggregateProcessorCompletionTimeout(true);
+    }
+
+    private void doTestAggregateProcessorCompletionTimeout(boolean eager) 
throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+B+C");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+
+        AggregateProcessor ap = new AggregateProcessor(done, corr, as);
+        ap.setCompletionTimeout(3000);
+        ap.setEagerEvaluateCompletionPredicate(eager);
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+        e2.getIn().setHeader("id", 123);
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("C");
+        e3.getIn().setHeader("id", 123);
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("D");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+
+        Thread.sleep(250);
+        ap.process(e2);
+
+        Thread.sleep(500);
+        ap.process(e3);
+
+        Thread.sleep(5000);
+        ap.process(e4);
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
+
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to