Author: davsclaus Date: Tue Apr 12 14:36:52 2011 New Revision: 1091432 URL: http://svn.apache.org/viewvc?rev=1091432&view=rev Log: CAMEL-3424: Added skipDuplicate option to idempotent consumer EIP, to allow end users to handle duplicates in the routes.
Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java - copied, changed from r1091302, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml - copied, changed from r1091302, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1091432&r1=1091431&r2=1091432&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Tue Apr 12 14:36:52 2011 @@ -99,6 +99,7 @@ public interface Exchange { String DEFAULT_CHARSET_PROPERTY = "org.apache.camel.default.charset"; String DESTINATION_OVERRIDE_URL = "CamelDestinationOverrideUrl"; String DISABLE_HTTP_STREAM_CACHE = "CamelDisableHttpStreamCache"; + String DUPLICATE_MESSAGE = "CamelDuplicateMessage"; String ERRORHANDLER_HANDLED = "CamelErrorHandlerHandled"; String EXCEPTION_CAUGHT = "CamelExceptionCaught"; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java?rev=1091432&r1=1091431&r2=1091432&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java Tue Apr 12 14:36:52 2011 @@ -42,6 +42,8 @@ public class IdempotentConsumerDefinitio private String messageIdRepositoryRef; @XmlAttribute private Boolean eager; + @XmlAttribute + private Boolean skipDuplicate; @XmlTransient private IdempotentRepository<?> idempotentRepository; @@ -108,6 +110,22 @@ public class IdempotentConsumerDefinitio return this; } + /** + * Sets whether to skip duplicates or not. + * <p/> + * The default behavior is to skip duplicates. + * <p/> + * A duplicate message would have the Exchange property {@link org.apache.camel.Exchange#DUPLICATE_MESSAGE} set + * to a {@link Boolean#TRUE} value. A none duplicate message will not have this property set. + * + * @param skipDuplicate <tt>true</tt> to skip duplicates, <tt>false</tt> to allow duplicates. + * @return builder + */ + public IdempotentConsumerDefinition skipDuplicate(boolean skipDuplicate) { + setSkipDuplicate(skipDuplicate); + return this; + } + public String getMessageIdRepositoryRef() { return messageIdRepositoryRef; } @@ -137,6 +155,19 @@ public class IdempotentConsumerDefinitio return eager != null ? eager : true; } + public Boolean getSkipDuplicate() { + return skipDuplicate; + } + + public void setSkipDuplicate(Boolean skipDuplicate) { + this.skipDuplicate = skipDuplicate; + } + + public boolean isSkipDuplicate() { + // defaults to true if not configured + return skipDuplicate != null ? skipDuplicate : true; + } + @Override @SuppressWarnings("unchecked") public Processor createProcessor(RouteContext routeContext) throws Exception { @@ -151,7 +182,7 @@ public class IdempotentConsumerDefinitio Expression expression = getExpression().createExpression(routeContext); - return new IdempotentConsumer(expression, idempotentRepository, isEager(), childProcessor); + return new IdempotentConsumer(expression, idempotentRepository, isEager(), isSkipDuplicate(), childProcessor); } /** @@ -161,7 +192,7 @@ public class IdempotentConsumerDefinitio * @return the repository */ protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) { - if (idempotentRepository == null && messageIdRepositoryRef != null) { + if (messageIdRepositoryRef != null) { idempotentRepository = routeContext.lookup(messageIdRepositoryRef, IdempotentRepository.class); } return idempotentRepository; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1091432&r1=1091431&r2=1091432&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Tue Apr 12 14:36:52 2011 @@ -1125,7 +1125,22 @@ public abstract class ProcessorDefinitio * to avoid duplicate messages * * @param messageIdExpression expression to test of duplicate messages - * @param idempotentRepository the repository to use for duplicate chedck + * @return the builder + */ + public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression) { + IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(); + answer.setExpression(new ExpressionDefinition(messageIdExpression)); + addOutput(answer); + return answer; + } + + /** + * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a> + * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer} + * to avoid duplicate messages + * + * @param messageIdExpression expression to test of duplicate messages + * @param idempotentRepository the repository to use for duplicate check * @return the builder */ public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) { @@ -1139,9 +1154,11 @@ public abstract class ProcessorDefinitio * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer} * to avoid duplicate messages * - * @param idempotentRepository the repository to use for duplicate chedck + * @param idempotentRepository the repository to use for duplicate check * @return the builder used to create the expression + * @deprecated use any of the other methods */ + @Deprecated public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer(IdempotentRepository<?> idempotentRepository) { IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(); answer.setMessageIdRepository(idempotentRepository); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=1091432&r1=1091431&r2=1091432&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Tue Apr 12 14:36:52 2011 @@ -45,12 +45,14 @@ public class IdempotentConsumer extends private final AsyncProcessor processor; private final IdempotentRepository<String> idempotentRepository; private final boolean eager; + private final boolean skipDuplicate; public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository, - boolean eager, Processor processor) { + boolean eager, boolean skipDuplicate, Processor processor) { this.messageIdExpression = messageIdExpression; this.idempotentRepository = idempotentRepository; this.eager = eager; + this.skipDuplicate = skipDuplicate; this.processor = AsyncProcessorTypeConverter.convert(processor); } @@ -78,11 +80,21 @@ public class IdempotentConsumer extends newKey = !idempotentRepository.contains(messageId); } + + if (!newKey) { + // mark the exchange as duplicate + exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE); + } + if (!newKey) { // we already have this key so its a duplicate message onDuplicateMessage(exchange, messageId); - callback.done(true); - return true; + if (skipDuplicate) { + // if we should skip duplicate then we are done + LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange); + callback.done(true); + return true; + } } // register our on completion callback @@ -138,7 +150,7 @@ public class IdempotentConsumer extends * @param messageId the message ID of this exchange */ protected void onDuplicateMessage(Exchange exchange, String messageId) { - LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange); + // noop } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?rev=1091432&r1=1091431&r2=1091432&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java Tue Apr 12 14:36:52 2011 @@ -24,6 +24,7 @@ import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; +import org.apache.camel.spi.IdempotentRepository; /** * @version @@ -60,6 +61,74 @@ public class IdempotentConsumerTest exte assertMockEndpointsSatisfied(); } + public void testNotSkiDuplicate() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + IdempotentRepository repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); + + from("direct:start") + .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) + .to("mock:result"); + } + }); + context.start(); + + resultEndpoint.expectedBodiesReceived("one", "two", "one", "two", "one", "three"); + resultEndpoint.message(0).property(Exchange.DUPLICATE_MESSAGE).isNull(); + resultEndpoint.message(1).property(Exchange.DUPLICATE_MESSAGE).isNull(); + resultEndpoint.message(2).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); + resultEndpoint.message(3).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); + resultEndpoint.message(4).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); + resultEndpoint.message(5).property(Exchange.DUPLICATE_MESSAGE).isNull(); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + + public void testNotSkiDuplicateWithFilter() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + IdempotentRepository repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); + + // START SNIPPET: e1 + from("direct:start") + // instruct idempotent consumer to not skip duplicates as we will filter then our self + .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) + .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)) + // filter out duplicate messages by sending them to someplace else and then stop + .to("mock:duplicate") + .stop() + .end() + // and here we process only new messages (no duplicates) + .to("mock:result"); + // END SNIPPET: e1 + } + }); + context.start(); + + resultEndpoint.expectedBodiesReceived("one", "two", "three"); + + getMockEndpoint("mock:duplicate").expectedBodiesReceived("one", "two", "one"); + getMockEndpoint("mock:duplicate").allMessages().property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + public void testFailedExchangesNotAddedDeadLetterChannel() throws Exception { context.addRoutes(new RouteBuilder() { @Override Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java?rev=1091432&view=auto ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java (added) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.java Tue Apr 12 14:36:52 2011 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spring.SpringTestSupport; +import org.springframework.context.support.AbstractXmlApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +/** + * @version + */ +public class SpringIdempotentConsumerNoSkipDuplicateFilterTest extends SpringTestSupport { + + @Override + protected AbstractXmlApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml"); + } + + public void testDuplicateMessagesAreFilteredOut() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("one", "two", "three"); + + getMockEndpoint("mock:duplicate").expectedBodiesReceived("one", "two", "one"); + getMockEndpoint("mock:duplicate").allMessages().property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + + protected void sendMessage(final Object messageId, final Object body) { + template.send("direct:start", new Processor() { + public void process(Exchange exchange) { + // now lets fire in a message + Message in = exchange.getIn(); + in.setBody(body); + in.setHeader("messageId", messageId); + } + }); + } + +} Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java (from r1091302, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.java&r1=1091302&r2=1091432&rev=1091432&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.java Tue Apr 12 14:36:52 2011 @@ -27,16 +27,22 @@ import org.springframework.context.suppo /** * @version */ -public class SpringIdempotentConsumerTest extends SpringTestSupport { +public class SpringIdempotentConsumerNoSkipDuplicateTest extends SpringTestSupport { @Override protected AbstractXmlApplicationContext createApplicationContext() { - return new ClassPathXmlApplicationContext("org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml"); + return new ClassPathXmlApplicationContext("org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml"); } public void testDuplicateMessagesAreFilteredOut() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("one", "two", "three"); + mock.expectedBodiesReceived("one", "two", "one", "two", "one", "three"); + mock.message(0).property(Exchange.DUPLICATE_MESSAGE).isNull(); + mock.message(1).property(Exchange.DUPLICATE_MESSAGE).isNull(); + mock.message(2).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); + mock.message(3).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); + mock.message(4).property(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE); + mock.message(5).property(Exchange.DUPLICATE_MESSAGE).isNull(); sendMessage("1", "one"); sendMessage("2", "two"); Added: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml?rev=1091432&view=auto ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml (added) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateFilterTest.xml Tue Apr 12 14:36:52 2011 @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <!-- START SNIPPET: e1 --> + + <!-- idempotent repository, just use a memory based for testing --> + <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> + + <camelContext xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="direct:start"/> + <!-- we do not want to skip any duplicate messages --> + <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false"> + <!-- use the messageId header as key for identifying duplicate messages --> + <header>messageId</header> + <!-- we will to handle duplicate messages using a filter --> + <filter> + <!-- the filter will only react on duplicate messages, if this property is set on the Exchange --> + <property>CamelDuplicateMessage</property> + <!-- and send the message to this mock, due its part of an unit test --> + <!-- but you can of course do anything as its part of the route --> + <to uri="mock:duplicate"/> + <!-- and then stop --> + <stop/> + </filter> + <!-- here we route only new messages --> + <to uri="mock:result"/> + </idempotentConsumer> + </route> + </camelContext> + <!-- END SNIPPET: e1 --> + +</beans> Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml (from r1091302, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml&r1=1091302&r2=1091432&rev=1091432&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerTest.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringIdempotentConsumerNoSkipDuplicateTest.xml Tue Apr 12 14:36:52 2011 @@ -22,21 +22,18 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <!-- START SNIPPET: e1 --> - <!-- repository for the idempotent consumer --> <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> - <idempotentConsumer messageIdRepositoryRef="myRepo"> + <!-- we do not want to skip any duplicate messages --> + <idempotentConsumer messageIdRepositoryRef="myRepo" skipDuplicate="false"> <!-- use the messageId header as key for identifying duplicate messages --> <header>messageId</header> - <!-- if not a duplicate send it to this mock endpoint --> <to uri="mock:result"/> </idempotentConsumer> </route> </camelContext> - <!-- END SNIPPET: e1 --> </beans>