CAMEL-9480: IdempotentConsumer - If exception from repo it should be able to handle by onException
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5ee90c1a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5ee90c1a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5ee90c1a Branch: refs/heads/camel-2.15.x Commit: 5ee90c1aa8f61c3765f9c825fbb2385b591ab6ab Parents: 4ccd7de Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jan 5 09:49:47 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jan 5 10:25:15 2016 +0100 ---------------------------------------------------------------------- .../idempotent/IdempotentConsumer.java | 74 ++++++++++++-------- .../IdempotentConsumerRepoExceptionTest.java | 70 ++++++++++++++++++ 2 files changed, 113 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5ee90c1a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java index d3afe7a..f534991 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java @@ -78,48 +78,60 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor } public boolean process(Exchange exchange, AsyncCallback callback) { - final String messageId = messageIdExpression.evaluate(exchange, String.class); - if (messageId == null) { - exchange.setException(new NoMessageIdException(exchange, messageIdExpression)); + final String messageId; + try { + messageId = messageIdExpression.evaluate(exchange, String.class); + if (messageId == null) { + exchange.setException(new NoMessageIdException(exchange, messageIdExpression)); + callback.done(true); + return true; + } + } catch (Exception e) { + exchange.setException(e); callback.done(true); return true; } - boolean newKey; - if (eager) { - // add the key to the repository - if (idempotentRepository instanceof ExchangeIdempotentRepository) { - newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId); + try { + boolean newKey; + if (eager) { + // add the key to the repository + if (idempotentRepository instanceof ExchangeIdempotentRepository) { + newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId); + } else { + newKey = idempotentRepository.add(messageId); + } } else { - newKey = idempotentRepository.add(messageId); + // check if we already have the key + if (idempotentRepository instanceof ExchangeIdempotentRepository) { + newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId); + } else { + newKey = !idempotentRepository.contains(messageId); + } } - } else { - // check if we already have the key - if (idempotentRepository instanceof ExchangeIdempotentRepository) { - newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId); - } else { - newKey = !idempotentRepository.contains(messageId); - } - } - - if (!newKey) { - // mark the exchange as duplicate - exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE); + if (!newKey) { + // mark the exchange as duplicate + exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE); - // we already have this key so its a duplicate message - onDuplicate(exchange, messageId); + // we already have this key so its a duplicate message + onDuplicate(exchange, messageId); - if (skipDuplicate) { - // if we should skip duplicate then we are done - LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange); - callback.done(true); - return true; + if (skipDuplicate) { + // if we should skip duplicate then we are done + LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange); + callback.done(true); + return true; + } } - } - // register our on completion callback - exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure)); + // register our on completion callback + exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure)); + + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + } // process the exchange return processor.process(exchange, callback); http://git-wip-us.apache.org/repos/asf/camel/blob/5ee90c1a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java new file mode 100644 index 0000000..1cdfe26 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; +import org.apache.camel.spi.IdempotentRepository; + +/** + * @version + */ +public class IdempotentConsumerRepoExceptionTest extends ContextTestSupport { + + private IdempotentRepository myRepo = new MyRepo(); + + public void testRepoException() throws Exception { + getMockEndpoint("mock:dead").expectedBodiesReceived("nineninenine"); + getMockEndpoint("mock:result").expectedBodiesReceived("one", "two", "three"); + + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "nineninenine", "messageId", "999"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "three", "messageId", "3"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .idempotentConsumer(header("messageId"), myRepo) + .to("mock:result"); + + } + }; + } + + private class MyRepo extends MemoryIdempotentRepository { + @Override + public boolean add(String key) { + if ("999".equals(key)) { + throw new IllegalArgumentException("Forced"); + } + return super.add(key); + } + } +}