Author: davsclaus
Date: Tue Apr 12 14:36:52 2011
New Revision: 1091432

URL: http://svn.apache.org/viewvc?rev=1091432&view=rev
Log:
CAMEL-3424: Added skipDuplicate option to idempotent consumer EIP, to allow end 
users to handle duplicates in the routes.

Added:
    
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java
    
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java
      - copied, changed from r1091302, 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.java
    
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml
    
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml
      - copied, changed from r1091302, 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1091432&r1=1091431&r2=1091432&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Tue Apr 
12 14:36:52 2011
@@ -99,6 +99,7 @@ public interface Exchange {
     String DEFAULT_CHARSET_PROPERTY  = "org.apache.camel.default.charset";
     String DESTINATION_OVERRIDE_URL = "CamelDestinationOverrideUrl";
     String DISABLE_HTTP_STREAM_CACHE = "CamelDisableHttpStreamCache";
+    String DUPLICATE_MESSAGE         = "CamelDuplicateMessage";
 
     String ERRORHANDLER_HANDLED = "CamelErrorHandlerHandled";
     String EXCEPTION_CAUGHT     = "CamelExceptionCaught";

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java?rev=1091432&r1=1091431&r2=1091432&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
 Tue Apr 12 14:36:52 2011
@@ -42,6 +42,8 @@ public class IdempotentConsumerDefinitio
     private String messageIdRepositoryRef;
     @XmlAttribute
     private Boolean eager;
+    @XmlAttribute
+    private Boolean skipDuplicate;
     @XmlTransient
     private IdempotentRepository<?> idempotentRepository;
 
@@ -108,6 +110,22 @@ public class IdempotentConsumerDefinitio
         return this;
     }
 
+    /**
+     * Sets whether to skip duplicates or not.
+     * <p/>
+     * The default behavior is to skip duplicates.
+     * <p/>
+     * A duplicate message would have the Exchange property {@link 
org.apache.camel.Exchange#DUPLICATE_MESSAGE} set
+     * to a {@link Boolean#TRUE} value. A none duplicate message will not have 
this property set.
+     *
+     * @param skipDuplicate  <tt>true</tt> to skip duplicates, <tt>false</tt> 
to allow duplicates.
+     * @return builder
+     */
+    public IdempotentConsumerDefinition skipDuplicate(boolean skipDuplicate) {
+        setSkipDuplicate(skipDuplicate);
+        return this;
+    }
+
     public String getMessageIdRepositoryRef() {
         return messageIdRepositoryRef;
     }
@@ -137,6 +155,19 @@ public class IdempotentConsumerDefinitio
         return eager != null ? eager : true;
     }
 
+    public Boolean getSkipDuplicate() {
+        return skipDuplicate;
+    }
+
+    public void setSkipDuplicate(Boolean skipDuplicate) {
+        this.skipDuplicate = skipDuplicate;
+    }
+
+    public boolean isSkipDuplicate() {
+        // defaults to true if not configured
+        return skipDuplicate != null ? skipDuplicate : true;
+    }
+
     @Override
     @SuppressWarnings("unchecked")
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
@@ -151,7 +182,7 @@ public class IdempotentConsumerDefinitio
 
         Expression expression = getExpression().createExpression(routeContext);
 
-        return new IdempotentConsumer(expression, idempotentRepository, 
isEager(), childProcessor);
+        return new IdempotentConsumer(expression, idempotentRepository, 
isEager(), isSkipDuplicate(), childProcessor);
     }
 
     /**
@@ -161,7 +192,7 @@ public class IdempotentConsumerDefinitio
      * @return the repository
      */
     protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext 
routeContext) {
-        if (idempotentRepository == null && messageIdRepositoryRef != null) {
+        if (messageIdRepositoryRef != null) {
             idempotentRepository = routeContext.lookup(messageIdRepositoryRef, 
IdempotentRepository.class);
         }
         return idempotentRepository;

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1091432&r1=1091431&r2=1091432&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 Tue Apr 12 14:36:52 2011
@@ -1125,7 +1125,22 @@ public abstract class ProcessorDefinitio
      * to avoid duplicate messages
      *
      * @param messageIdExpression  expression to test of duplicate messages
-     * @param idempotentRepository  the repository to use for duplicate chedck
+     * @return the builder
+     */
+    public IdempotentConsumerDefinition idempotentConsumer(Expression 
messageIdExpression) {
+        IdempotentConsumerDefinition answer = new 
IdempotentConsumerDefinition();
+        answer.setExpression(new ExpressionDefinition(messageIdExpression));
+        addOutput(answer);
+        return answer;
+    }
+
+    /**
+     * <a href="http://camel.apache.org/idempotent-consumer.html";>Idempotent 
consumer EIP:</a>
+     * Creates an {@link 
org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
+     * to avoid duplicate messages
+     *
+     * @param messageIdExpression  expression to test of duplicate messages
+     * @param idempotentRepository  the repository to use for duplicate check
      * @return the builder
      */
     public IdempotentConsumerDefinition idempotentConsumer(Expression 
messageIdExpression, IdempotentRepository<?> idempotentRepository) {
@@ -1139,9 +1154,11 @@ public abstract class ProcessorDefinitio
      * Creates an {@link 
org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
      * to avoid duplicate messages
      *
-     * @param idempotentRepository the repository to use for duplicate chedck
+     * @param idempotentRepository the repository to use for duplicate check
      * @return the builder used to create the expression
+     * @deprecated use any of the other methods
      */
+    @Deprecated
     public ExpressionClause<IdempotentConsumerDefinition> 
idempotentConsumer(IdempotentRepository<?> idempotentRepository) {
         IdempotentConsumerDefinition answer = new 
IdempotentConsumerDefinition();
         answer.setMessageIdRepository(idempotentRepository);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=1091432&r1=1091431&r2=1091432&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
 Tue Apr 12 14:36:52 2011
@@ -45,12 +45,14 @@ public class IdempotentConsumer extends 
     private final AsyncProcessor processor;
     private final IdempotentRepository<String> idempotentRepository;
     private final boolean eager;
+    private final boolean skipDuplicate;
 
     public IdempotentConsumer(Expression messageIdExpression, 
IdempotentRepository<String> idempotentRepository,
-                              boolean eager, Processor processor) {
+                              boolean eager, boolean skipDuplicate, Processor 
processor) {
         this.messageIdExpression = messageIdExpression;
         this.idempotentRepository = idempotentRepository;
         this.eager = eager;
+        this.skipDuplicate = skipDuplicate;
         this.processor = AsyncProcessorTypeConverter.convert(processor);
     }
 
@@ -78,11 +80,21 @@ public class IdempotentConsumer extends 
             newKey = !idempotentRepository.contains(messageId);
         }
 
+
+        if (!newKey) {
+            // mark the exchange as duplicate
+            exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
+        }
+
         if (!newKey) {
             // we already have this key so its a duplicate message
             onDuplicateMessage(exchange, messageId);
-            callback.done(true);
-            return true;
+            if (skipDuplicate) {
+                // if we should skip duplicate then we are done
+                LOG.debug("Ignoring duplicate message with id: {} for 
exchange: {}", messageId, exchange);
+                callback.done(true);
+                return true;
+            }
         }
 
         // register our on completion callback
@@ -138,7 +150,7 @@ public class IdempotentConsumer extends 
      * @param messageId the message ID of this exchange
      */
     protected void onDuplicateMessage(Exchange exchange, String messageId) {
-        LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", 
messageId, exchange);
+        // noop
     }
 
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?rev=1091432&r1=1091431&r2=1091432&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java
 Tue Apr 12 14:36:52 2011
@@ -24,6 +24,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
 
 /**
  * @version 
@@ -60,6 +61,74 @@ public class IdempotentConsumerTest exte
         assertMockEndpointsSatisfied();
     }
 
+    public void testNotSkiDuplicate() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                IdempotentRepository repo = 
MemoryIdempotentRepository.memoryIdempotentRepository(200);
+
+                from("direct:start")
+                    
.idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
+                    .to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedBodiesReceived("one", "two", "one", "two", 
"one", "three");
+        
resultEndpoint.message(0).property(Exchange.DUPLICATE_MESSAGE).isNull();
+        
resultEndpoint.message(1).property(Exchange.DUPLICATE_MESSAGE).isNull();
+        
resultEndpoint.message(2).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
+        
resultEndpoint.message(3).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
+        
resultEndpoint.message(4).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
+        
resultEndpoint.message(5).property(Exchange.DUPLICATE_MESSAGE).isNull();
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testNotSkiDuplicateWithFilter() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                IdempotentRepository repo = 
MemoryIdempotentRepository.memoryIdempotentRepository(200);
+
+                // START SNIPPET: e1
+                from("direct:start")
+                    // instruct idempotent consumer to not skip duplicates as 
we will filter then our self
+                    
.idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
+                    
.filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
+                        // filter out duplicate messages by sending them to 
someplace else and then stop
+                        .to("mock:duplicate")
+                        .stop()
+                    .end()
+                    // and here we process only new messages (no duplicates)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        });
+        context.start();
+
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+
+        getMockEndpoint("mock:duplicate").expectedBodiesReceived("one", "two", 
"one");
+        
getMockEndpoint("mock:duplicate").allMessages().property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
     public void testFailedExchangesNotAddedDeadLetterChannel() throws 
Exception {
         context.addRoutes(new RouteBuilder() {
             @Override

Added: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java?rev=1091432&view=auto
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java
 (added)
+++ 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java
 Tue Apr 12 14:36:52 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.SpringTestSupport;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version 
+ */
+public class SpringIdempotentConsumerNoSkipDuplicateFilterTest extends 
SpringTestSupport {
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml");
+    }
+
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("one", "two", "three");
+
+        getMockEndpoint("mock:duplicate").expectedBodiesReceived("one", "two", 
"one");
+        
getMockEndpoint("mock:duplicate").allMessages().property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendMessage(final Object messageId, final Object body) {
+        template.send("direct:start", new Processor() {
+            public void process(Exchange exchange) {
+                // now lets fire in a message
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader("messageId", messageId);
+            }
+        });
+    }
+
+}

Copied: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java
 (from r1091302, 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.java&r1=1091302&r2=1091432&rev=1091432&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.java
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java
 Tue Apr 12 14:36:52 2011
@@ -27,16 +27,22 @@ import org.springframework.context.suppo
 /**
  * @version 
  */
-public class SpringIdempotentConsumerTest extends SpringTestSupport {
+public class SpringIdempotentConsumerNoSkipDuplicateTest extends 
SpringTestSupport {
 
     @Override
     protected AbstractXmlApplicationContext createApplicationContext() {
-        return new 
ClassPathXmlApplicationContext("org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml");
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml");
     }
 
     public void testDuplicateMessagesAreFilteredOut() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("one", "two", "three");
+        mock.expectedBodiesReceived("one", "two", "one", "two", "one", 
"three");
+        mock.message(0).property(Exchange.DUPLICATE_MESSAGE).isNull();
+        mock.message(1).property(Exchange.DUPLICATE_MESSAGE).isNull();
+        
mock.message(2).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
+        
mock.message(3).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
+        
mock.message(4).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE);
+        mock.message(5).property(Exchange.DUPLICATE_MESSAGE).isNull();
 
         sendMessage("1", "one");
         sendMessage("2", "two");

Added: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml?rev=1091432&view=auto
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml
 (added)
+++ 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml
 Tue Apr 12 14:36:52 2011
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+    <!-- START SNIPPET: e1 -->
+
+    <!-- idempotent repository, just use a memory based for testing -->
+    <bean id="myRepo" 
class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
+
+    <camelContext xmlns="http://camel.apache.org/schema/spring";>
+        <route>
+            <from uri="direct:start"/>
+            <!-- we do not want to skip any duplicate messages -->
+            <idempotentConsumer messageIdRepositoryRef="myRepo" 
skipDuplicate="false">
+                <!-- use the messageId header as key for identifying duplicate 
messages -->
+                <header>messageId</header>
+                <!-- we will to handle duplicate messages using a filter -->
+                <filter>
+                    <!-- the filter will only react on duplicate messages, if 
this property is set on the Exchange -->
+                    <property>CamelDuplicateMessage</property>
+                    <!-- and send the message to this mock, due its part of an 
unit test -->
+                    <!-- but you can of course do anything as its part of the 
route -->
+                    <to uri="mock:duplicate"/>
+                    <!-- and then stop -->
+                    <stop/>
+                </filter>
+                <!-- here we route only new messages -->
+                <to uri="mock:result"/>
+            </idempotentConsumer>
+        </route>
+    </camelContext>
+    <!-- END SNIPPET: e1 -->
+
+</beans>

Copied: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml
 (from r1091302, 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml&r1=1091302&r2=1091432&rev=1091432&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml
 Tue Apr 12 14:36:52 2011
@@ -22,21 +22,18 @@
        http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-    <!-- START SNIPPET: e1 -->
-    <!-- repository for the idempotent consumer -->
     <bean id="myRepo" 
class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
 
     <camelContext xmlns="http://camel.apache.org/schema/spring";>
         <route>
             <from uri="direct:start"/>
-            <idempotentConsumer messageIdRepositoryRef="myRepo">
+            <!-- we do not want to skip any duplicate messages -->
+            <idempotentConsumer messageIdRepositoryRef="myRepo" 
skipDuplicate="false">
                 <!-- use the messageId header as key for identifying duplicate 
messages -->
                 <header>messageId</header>
-                <!-- if not a duplicate send it to this mock endpoint -->
                 <to uri="mock:result"/>
             </idempotentConsumer>
         </route>
     </camelContext>
-    <!-- END SNIPPET: e1 -->
 
 </beans>


Reply via email to