Author: davsclaus
Date: Sun Jun  7 11:21:00 2009
New Revision: 782371

URL: http://svn.apache.org/viewvc?rev=782371&view=rev
Log:
CAMEL-1650: idempotent consumer now eagerly adds to repository to detect 
duplications of in progress exchanges. Fixed the jpa unit test.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java
   (with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
    
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
    
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
    
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
 Sun Jun  7 11:21:00 2009
@@ -138,6 +138,16 @@
         }
     }
 
+    public boolean remove(String key) {
+        synchronized (cache) {
+            // init store if not loaded before
+            if (init.compareAndSet(false, true)) {
+                loadStore();
+            }
+            return cache.remove(key) != null;
+        }
+    }
+
     public File getFileStore() {
         return fileStore;
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
 Sun Jun  7 11:21:00 2009
@@ -42,8 +42,7 @@
     private final Processor processor;
     private final IdempotentRepository idempotentRepository;
 
-    public IdempotentConsumer(Expression messageIdExpression, 
IdempotentRepository idempotentRepository,
-                              Processor processor) {
+    public IdempotentConsumer(Expression messageIdExpression, 
IdempotentRepository idempotentRepository, Processor processor) {
         this.messageIdExpression = messageIdExpression;
         this.idempotentRepository = idempotentRepository;
         this.processor = processor;
@@ -57,17 +56,21 @@
 
     @SuppressWarnings("unchecked")
     public void process(Exchange exchange) throws Exception {
-        String messageId = messageIdExpression.evaluate(exchange, 
String.class);
+        final String messageId = messageIdExpression.evaluate(exchange, 
String.class);
         if (messageId == null) {
             throw new NoMessageIdException(exchange, messageIdExpression);
         }
 
-        if (idempotentRepository.contains(messageId)) {
+        // add the key to the repository
+        boolean newKey = idempotentRepository.add(messageId);
+        if (!newKey) {
+            // we already have this key so its a duplicate message
             onDuplicateMessage(exchange, messageId);
         } else {
             // register our on completion callback
             exchange.addOnCompletion(new 
IdempotentOnCompletion(idempotentRepository, messageId));
-            // process it first
+
+            // process the exchange
             processor.process(exchange);
         }
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java
 Sun Jun  7 11:21:00 2009
@@ -33,10 +33,10 @@
 public class IdempotentOnCompletion implements Synchronization {
 
     private static final transient Log LOG = 
LogFactory.getLog(IdempotentOnCompletion.class);
-    private final IdempotentRepository idempotentRepository;
+    private final IdempotentRepository<String> idempotentRepository;
     private final String messageId;
 
-    public IdempotentOnCompletion(IdempotentRepository idempotentRepository, 
String messageId) {
+    public IdempotentOnCompletion(IdempotentRepository<String> 
idempotentRepository, String messageId) {
         this.idempotentRepository = idempotentRepository;
         this.messageId = messageId;
     }
@@ -57,12 +57,8 @@
      * @param exchange the exchange
      * @param messageId the message ID of this exchange
      */
-    @SuppressWarnings("unchecked")
     protected void onCompletedMessage(Exchange exchange, String messageId) {
-        idempotentRepository.add(messageId);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Added to repository with id: " + messageId + " for 
exchange: " + exchange);
-        }
+        // noop
     }
 
     /**
@@ -73,8 +69,9 @@
      * @param messageId the message ID of this exchange
      */
     protected void onFailedMessage(Exchange exchange, String messageId) {
+        idempotentRepository.remove(messageId);
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Not added to repository as exchange failed: " + 
exchange + " with id: " + messageId);
+            LOG.debug("Removed from repository as exchange failed: " + 
exchange + " with id: " + messageId);
         }
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java
 Sun Jun  7 11:21:00 2009
@@ -88,6 +88,12 @@
         }
     }
 
+    public boolean remove(String key) {
+        synchronized (cache) {
+            return cache.remove(key) != null;
+        }
+    }
+
     public Map<String, Object> getCache() {
         return cache;
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java
 Sun Jun  7 11:21:00 2009
@@ -41,4 +41,12 @@
      * @return <tt>true</tt> if this repository contains the specified element
      */
     boolean contains(E key);
+
+    /**
+     * Removes the key from the repository.
+     *
+     * @param key the key of the message for duplicate test
+     * @return <tt>true</tt> if the key was removed
+     */
+    boolean remove(E key);
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java
 Sun Jun  7 11:21:00 2009
@@ -95,6 +95,10 @@
         public boolean contains(String key) {
             return invoked;
         }
+
+        public boolean remove(String key) {
+            return true;
+        }
     }
     
 }
\ No newline at end of file

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
 Sun Jun  7 11:21:00 2009
@@ -17,13 +17,12 @@
 package org.apache.camel.management;
 
 import java.util.Set;
-
 import javax.management.ObjectName;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.Processor;
-import org.apache.camel.Exchange;
 
 /**
  * A unit test to verify mbean registration of multi-instances of a processor

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java?rev=782371&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java
 Sun Jun  7 11:21:00 2009
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+
+/**
+ * Concurreny test for idempotent consumer
+ *
+ * @version $Revision$
+ */
+public class IdempotentConsumerConcurrentTest extends ContextTestSupport {
+    protected Endpoint startEndpoint;
+    protected MockEndpoint resultEndpoint;
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(header("messageId"),
+                        
MemoryIdempotentRepository.memoryIdempotentRepository(200)).to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testFailedExchangesNotAdded() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).delay(0)
+                        .logStackTrace(false));
+
+                from("direct:start").idempotentConsumer(header("messageId"),
+                        
MemoryIdempotentRepository.memoryIdempotentRepository(200))
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                String id = 
exchange.getIn().getHeader("messageId", String.class);
+                                if (id.equals("2")) {
+                                    throw new IllegalArgumentException("Damm I 
cannot handle id 2");
+                                }
+                            }
+                        }).to("mock:result");
+            }
+        });
+        context.start();
+
+        // we send in 2 messages with id 2 that fails
+        getMockEndpoint("mock:error").expectedMessageCount(2);
+        resultEndpoint.expectedBodiesReceived("one", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    /**
+     * A multithreaded test for IdempotentConsumer filter
+     */
+    public void testThreadedIdempotentConsumer() throws Exception {
+        final int loopCount = 100;
+        final int threadCount = 10;
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").idempotentConsumer(header("messageId"),
+                        
MemoryIdempotentRepository.memoryIdempotentRepository(200))
+                        .delay(1).to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.reset();
+        resultEndpoint.expectedMessageCount(loopCount);
+
+        final boolean failedFlag[] = new boolean[1];
+        failedFlag[0] = false;
+
+        Thread[] threads = new Thread[threadCount];
+        for (int i = 0; i < threadCount; i++) {
+            final int threadIndex = i;
+            threads[threadIndex] = new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        for (int j = 0; j < loopCount; j++) {
+                            sendMessage("" + j, "multithreadedTest" + j);
+                        }
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        failedFlag[0] = true;
+                    }
+                }
+            };
+            threads[i].start();
+        }
+        for (int i = 0; i < threadCount; i++) {
+            threads[i].join();
+        }
+        assertFalse("At least one thread threw an exception", failedFlag[0]);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendMessage(final Object messageId, final Object body) {
+        template.send(startEndpoint, new Processor() {
+            public void process(Exchange exchange) {
+                // now lets fire in a message
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader("messageId", messageId);
+            }
+        });
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        startEndpoint = resolveMandatoryEndpoint("direct:start");
+        resultEndpoint = getMockEndpoint("mock:result");
+    }
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
 (original)
+++ 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
 Sun Jun  7 11:21:00 2009
@@ -89,6 +89,10 @@
         public boolean contains(String key) {
             return invoked;
         }
+
+        public boolean remove(String key) {
+            return true;
+        }
     }
 
 }
\ No newline at end of file

Modified: 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
 (original)
+++ 
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
 Sun Jun  7 11:21:00 2009
@@ -98,4 +98,21 @@
         return rc.booleanValue();
     }
 
+    public boolean remove(final String messageId) {
+        Boolean rc = (Boolean)transactionTemplate.execute(new 
TransactionCallback() {
+            public Object doInTransaction(TransactionStatus arg0) {
+                List list = jpaTemplate.find(QUERY_STRING, processorName, 
messageId);
+                if (list.isEmpty()) {
+                    return Boolean.FALSE;
+                } else {
+                    MessageProcessed processoed = (MessageProcessed) 
list.get(0);
+                    jpaTemplate.remove(processoed);
+                    jpaTemplate.flush();
+                    return Boolean.TRUE;
+                }
+            }
+        });
+        return rc.booleanValue();
+    }
+
 }

Modified: 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java?rev=782371&r1=782370&r2=782371&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java
 (original)
+++ 
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java
 Sun Jun  7 11:21:00 2009
@@ -19,8 +19,12 @@
 import java.util.List;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.IdempotentConsumerTest;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.idempotent.jpa.MessageProcessed;
 import org.apache.camel.spring.SpringCamelContext;
 import org.apache.camel.spring.SpringRouteBuilder;
@@ -39,12 +43,14 @@
 /**
  * @version $Revision$
  */
-public class JpaIdempotentConsumerTest extends IdempotentConsumerTest {
+public class JpaIdempotentConsumerTest extends ContextTestSupport {
     protected static final String SELECT_ALL_STRING = "select x from " + 
MessageProcessed.class.getName() + " x where x.processorName = ?1";
     protected static final String PROCESSOR_NAME = "myProcessorName";
 
     protected ApplicationContext applicationContext;
     protected JpaTemplate jpaTemplate;
+    protected Endpoint startEndpoint;
+    protected MockEndpoint resultEndpoint;
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
@@ -54,17 +60,8 @@
     }
 
     @Override
-    protected RouteBuilder createRouteBuilder() {
-        // START SNIPPET: idempotent
-        return new SpringRouteBuilder() {
-            public void configure() {
-                from("direct:start").idempotentConsumer(
-                        header("messageId"),
-                        jpaMessageIdRepository(lookup(JpaTemplate.class), 
PROCESSOR_NAME)
-                ).to("mock:result");
-            }
-        };
-        // END SNIPPET: idempotent
+    public boolean isUseRouteBuilder() {
+        return false;
     }
 
     protected void cleanupRepository() {
@@ -85,4 +82,85 @@
             }
         });
     }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        startEndpoint = resolveMandatoryEndpoint("direct:start");
+        resultEndpoint = getMockEndpoint("mock:result");
+    }
+
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        context.addRoutes(new SpringRouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: idempotent
+                from("direct:start").idempotentConsumer(
+                        header("messageId"),
+                        jpaMessageIdRepository(lookup(JpaTemplate.class), 
PROCESSOR_NAME)
+                ).to("mock:result");
+                // END SNIPPET: idempotent
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testFailedExchangesNotAdded() throws Exception {
+        context.addRoutes(new SpringRouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).delay(0).logStackTrace(false));
+
+                from("direct:start").idempotentConsumer(
+                        header("messageId"),
+                        jpaMessageIdRepository(lookup(JpaTemplate.class), 
PROCESSOR_NAME)
+                ).process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String id = exchange.getIn().getHeader("messageId", 
String.class);
+                        if (id.equals("2")) {
+                            throw new IllegalArgumentException("Damm I cannot 
handle id 2");
+                        }
+                    }
+                }).to("mock:result");
+            }
+        });
+        context.start();
+
+        // we send in 2 messages with id 2 that fails
+        getMockEndpoint("mock:error").expectedMessageCount(2);
+        resultEndpoint.expectedBodiesReceived("one", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendMessage(final Object messageId, final Object body) {
+        template.send(startEndpoint, new Processor() {
+            public void process(Exchange exchange) {
+                // now lets fire in a message
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader("messageId", messageId);
+            }
+        });
+    }
+
 }


Reply via email to