Author: davsclaus Date: Sun Jun 7 11:21:00 2009 New Revision: 782371 URL: http://svn.apache.org/viewvc?rev=782371&view=rev Log: CAMEL-1650: idempotent consumer now eagerly adds to repository to detect duplications of in progress exchanges. Fixed the jpa unit test.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java Sun Jun 7 11:21:00 2009 @@ -138,6 +138,16 @@ } } + public boolean remove(String key) { + synchronized (cache) { + // init store if not loaded before + if (init.compareAndSet(false, true)) { + loadStore(); + } + return cache.remove(key) != null; + } + } + public File getFileStore() { return fileStore; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Sun Jun 7 11:21:00 2009 @@ -42,8 +42,7 @@ private final Processor processor; private final IdempotentRepository idempotentRepository; - public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository, - Processor processor) { + public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository, Processor processor) { this.messageIdExpression = messageIdExpression; this.idempotentRepository = idempotentRepository; this.processor = processor; @@ -57,17 +56,21 @@ @SuppressWarnings("unchecked") public void process(Exchange exchange) throws Exception { - String messageId = messageIdExpression.evaluate(exchange, String.class); + final String messageId = messageIdExpression.evaluate(exchange, String.class); if (messageId == null) { throw new NoMessageIdException(exchange, messageIdExpression); } - if (idempotentRepository.contains(messageId)) { + // add the key to the repository + boolean newKey = idempotentRepository.add(messageId); + if (!newKey) { + // we already have this key so its a duplicate message onDuplicateMessage(exchange, messageId); } else { // register our on completion callback exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId)); - // process it first + + // process the exchange processor.process(exchange); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentOnCompletion.java Sun Jun 7 11:21:00 2009 @@ -33,10 +33,10 @@ public class IdempotentOnCompletion implements Synchronization { private static final transient Log LOG = LogFactory.getLog(IdempotentOnCompletion.class); - private final IdempotentRepository idempotentRepository; + private final IdempotentRepository<String> idempotentRepository; private final String messageId; - public IdempotentOnCompletion(IdempotentRepository idempotentRepository, String messageId) { + public IdempotentOnCompletion(IdempotentRepository<String> idempotentRepository, String messageId) { this.idempotentRepository = idempotentRepository; this.messageId = messageId; } @@ -57,12 +57,8 @@ * @param exchange the exchange * @param messageId the message ID of this exchange */ - @SuppressWarnings("unchecked") protected void onCompletedMessage(Exchange exchange, String messageId) { - idempotentRepository.add(messageId); - if (LOG.isDebugEnabled()) { - LOG.debug("Added to repository with id: " + messageId + " for exchange: " + exchange); - } + // noop } /** @@ -73,8 +69,9 @@ * @param messageId the message ID of this exchange */ protected void onFailedMessage(Exchange exchange, String messageId) { + idempotentRepository.remove(messageId); if (LOG.isDebugEnabled()) { - LOG.debug("Not added to repository as exchange failed: " + exchange + " with id: " + messageId); + LOG.debug("Removed from repository as exchange failed: " + exchange + " with id: " + messageId); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryIdempotentRepository.java Sun Jun 7 11:21:00 2009 @@ -88,6 +88,12 @@ } } + public boolean remove(String key) { + synchronized (cache) { + return cache.remove(key) != null; + } + } + public Map<String, Object> getCache() { return cache; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/IdempotentRepository.java Sun Jun 7 11:21:00 2009 @@ -41,4 +41,12 @@ * @return <tt>true</tt> if this repository contains the specified element */ boolean contains(E key); + + /** + * Removes the key from the repository. + * + * @param key the key of the message for duplicate test + * @return <tt>true</tt> if the key was removed + */ + boolean remove(E key); } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentRefTest.java Sun Jun 7 11:21:00 2009 @@ -95,6 +95,10 @@ public boolean contains(String key) { return invoked; } + + public boolean remove(String key) { + return true; + } } } \ No newline at end of file Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java Sun Jun 7 11:21:00 2009 @@ -17,13 +17,12 @@ package org.apache.camel.management; import java.util.Set; - import javax.management.ObjectName; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.Processor; -import org.apache.camel.Exchange; /** * A unit test to verify mbean registration of multi-instances of a processor Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java?rev=782371&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java Sun Jun 7 11:21:00 2009 @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; + +/** + * Concurreny test for idempotent consumer + * + * @version $Revision$ + */ +public class IdempotentConsumerConcurrentTest extends ContextTestSupport { + protected Endpoint startEndpoint; + protected MockEndpoint resultEndpoint; + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testDuplicateMessagesAreFilteredOut() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").idempotentConsumer(header("messageId"), + MemoryIdempotentRepository.memoryIdempotentRepository(200)).to("mock:result"); + } + }); + context.start(); + + resultEndpoint.expectedBodiesReceived("one", "two", "three"); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + + public void testFailedExchangesNotAdded() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).delay(0) + .logStackTrace(false)); + + from("direct:start").idempotentConsumer(header("messageId"), + MemoryIdempotentRepository.memoryIdempotentRepository(200)) + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + String id = exchange.getIn().getHeader("messageId", String.class); + if (id.equals("2")) { + throw new IllegalArgumentException("Damm I cannot handle id 2"); + } + } + }).to("mock:result"); + } + }); + context.start(); + + // we send in 2 messages with id 2 that fails + getMockEndpoint("mock:error").expectedMessageCount(2); + resultEndpoint.expectedBodiesReceived("one", "three"); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + + /** + * A multithreaded test for IdempotentConsumer filter + */ + public void testThreadedIdempotentConsumer() throws Exception { + final int loopCount = 100; + final int threadCount = 10; + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").idempotentConsumer(header("messageId"), + MemoryIdempotentRepository.memoryIdempotentRepository(200)) + .delay(1).to("mock:result"); + } + }); + context.start(); + + resultEndpoint.reset(); + resultEndpoint.expectedMessageCount(loopCount); + + final boolean failedFlag[] = new boolean[1]; + failedFlag[0] = false; + + Thread[] threads = new Thread[threadCount]; + for (int i = 0; i < threadCount; i++) { + final int threadIndex = i; + threads[threadIndex] = new Thread() { + @Override + public void run() { + try { + for (int j = 0; j < loopCount; j++) { + sendMessage("" + j, "multithreadedTest" + j); + } + } catch (Throwable e) { + e.printStackTrace(); + failedFlag[0] = true; + } + } + }; + threads[i].start(); + } + for (int i = 0; i < threadCount; i++) { + threads[i].join(); + } + assertFalse("At least one thread threw an exception", failedFlag[0]); + + assertMockEndpointsSatisfied(); + } + + protected void sendMessage(final Object messageId, final Object body) { + template.send(startEndpoint, new Processor() { + public void process(Exchange exchange) { + // now lets fire in a message + Message in = exchange.getIn(); + in.setBody(body); + in.setHeader("messageId", messageId); + } + }); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + startEndpoint = resolveMandatoryEndpoint("direct:start"); + resultEndpoint = getMockEndpoint("mock:result"); + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerConcurrentTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java (original) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java Sun Jun 7 11:21:00 2009 @@ -89,6 +89,10 @@ public boolean contains(String key) { return invoked; } + + public boolean remove(String key) { + return true; + } } } \ No newline at end of file Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java (original) +++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java Sun Jun 7 11:21:00 2009 @@ -98,4 +98,21 @@ return rc.booleanValue(); } + public boolean remove(final String messageId) { + Boolean rc = (Boolean)transactionTemplate.execute(new TransactionCallback() { + public Object doInTransaction(TransactionStatus arg0) { + List list = jpaTemplate.find(QUERY_STRING, processorName, messageId); + if (list.isEmpty()) { + return Boolean.FALSE; + } else { + MessageProcessed processoed = (MessageProcessed) list.get(0); + jpaTemplate.remove(processoed); + jpaTemplate.flush(); + return Boolean.TRUE; + } + } + }); + return rc.booleanValue(); + } + } Modified: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java?rev=782371&r1=782370&r2=782371&view=diff ============================================================================== --- camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java (original) +++ camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java Sun Jun 7 11:21:00 2009 @@ -19,8 +19,12 @@ import java.util.List; import org.apache.camel.CamelContext; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.processor.IdempotentConsumerTest; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.idempotent.jpa.MessageProcessed; import org.apache.camel.spring.SpringCamelContext; import org.apache.camel.spring.SpringRouteBuilder; @@ -39,12 +43,14 @@ /** * @version $Revision$ */ -public class JpaIdempotentConsumerTest extends IdempotentConsumerTest { +public class JpaIdempotentConsumerTest extends ContextTestSupport { protected static final String SELECT_ALL_STRING = "select x from " + MessageProcessed.class.getName() + " x where x.processorName = ?1"; protected static final String PROCESSOR_NAME = "myProcessorName"; protected ApplicationContext applicationContext; protected JpaTemplate jpaTemplate; + protected Endpoint startEndpoint; + protected MockEndpoint resultEndpoint; @Override protected CamelContext createCamelContext() throws Exception { @@ -54,17 +60,8 @@ } @Override - protected RouteBuilder createRouteBuilder() { - // START SNIPPET: idempotent - return new SpringRouteBuilder() { - public void configure() { - from("direct:start").idempotentConsumer( - header("messageId"), - jpaMessageIdRepository(lookup(JpaTemplate.class), PROCESSOR_NAME) - ).to("mock:result"); - } - }; - // END SNIPPET: idempotent + public boolean isUseRouteBuilder() { + return false; } protected void cleanupRepository() { @@ -85,4 +82,85 @@ } }); } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + startEndpoint = resolveMandatoryEndpoint("direct:start"); + resultEndpoint = getMockEndpoint("mock:result"); + } + + public void testDuplicateMessagesAreFilteredOut() throws Exception { + context.addRoutes(new SpringRouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: idempotent + from("direct:start").idempotentConsumer( + header("messageId"), + jpaMessageIdRepository(lookup(JpaTemplate.class), PROCESSOR_NAME) + ).to("mock:result"); + // END SNIPPET: idempotent + } + }); + context.start(); + + resultEndpoint.expectedBodiesReceived("one", "two", "three"); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + + public void testFailedExchangesNotAdded() throws Exception { + context.addRoutes(new SpringRouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2).delay(0).logStackTrace(false)); + + from("direct:start").idempotentConsumer( + header("messageId"), + jpaMessageIdRepository(lookup(JpaTemplate.class), PROCESSOR_NAME) + ).process(new Processor() { + public void process(Exchange exchange) throws Exception { + String id = exchange.getIn().getHeader("messageId", String.class); + if (id.equals("2")) { + throw new IllegalArgumentException("Damm I cannot handle id 2"); + } + } + }).to("mock:result"); + } + }); + context.start(); + + // we send in 2 messages with id 2 that fails + getMockEndpoint("mock:error").expectedMessageCount(2); + resultEndpoint.expectedBodiesReceived("one", "three"); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + + protected void sendMessage(final Object messageId, final Object body) { + template.send(startEndpoint, new Processor() { + public void process(Exchange exchange) { + // now lets fire in a message + Message in = exchange.getIn(); + in.setBody(body); + in.setHeader("messageId", messageId); + } + }); + } + }