Repository: camel Updated Branches: refs/heads/camel-2.15.x 4ccd7deb8 -> 5ee90c1aa refs/heads/camel-2.16.x 0e77534ba -> 9c95ebd05 refs/heads/master 6742fb77f -> 0ead2cac3
CAMEL-9480: IdempotentConsumer - If exception from repo it should be able to handle by onException Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0ead2cac Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0ead2cac Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0ead2cac Branch: refs/heads/master Commit: 0ead2cac3cc9473487677f4aee9c872fbc358fc9 Parents: 6742fb7 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jan 5 09:49:47 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jan 5 09:49:47 2016 +0100 ---------------------------------------------------------------------- .../idempotent/IdempotentConsumer.java | 82 ++++++++++++-------- .../IdempotentConsumerRepoExceptionTest.java | 70 +++++++++++++++++ 2 files changed, 118 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0ead2cac/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java index e28a214..7b64546 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java @@ -91,51 +91,65 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor } public boolean process(final Exchange exchange, final AsyncCallback callback) { - final String messageId = messageIdExpression.evaluate(exchange, String.class); - if (messageId == null) { - exchange.setException(new NoMessageIdException(exchange, messageIdExpression)); + final AsyncCallback target; + + final String messageId; + try { + messageId = messageIdExpression.evaluate(exchange, String.class); + if (messageId == null) { + exchange.setException(new NoMessageIdException(exchange, messageIdExpression)); + callback.done(true); + return true; + } + } catch (Exception e) { + exchange.setException(e); callback.done(true); return true; } - boolean newKey; - if (eager) { - // add the key to the repository - if (idempotentRepository instanceof ExchangeIdempotentRepository) { - newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId); - } else { - newKey = idempotentRepository.add(messageId); - } - } else { - // check if we already have the key - if (idempotentRepository instanceof ExchangeIdempotentRepository) { - newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId); + try { + boolean newKey; + if (eager) { + // add the key to the repository + if (idempotentRepository instanceof ExchangeIdempotentRepository) { + newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).add(exchange, messageId); + } else { + newKey = idempotentRepository.add(messageId); + } } else { - newKey = !idempotentRepository.contains(messageId); + // check if we already have the key + if (idempotentRepository instanceof ExchangeIdempotentRepository) { + newKey = ((ExchangeIdempotentRepository<String>) idempotentRepository).contains(exchange, messageId); + } else { + newKey = !idempotentRepository.contains(messageId); + } } - } + if (!newKey) { + // mark the exchange as duplicate + exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE); - if (!newKey) { - // mark the exchange as duplicate - exchange.setProperty(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE); + // we already have this key so its a duplicate message + onDuplicate(exchange, messageId); - // we already have this key so its a duplicate message - onDuplicate(exchange, messageId); - - if (skipDuplicate) { - // if we should skip duplicate then we are done - LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange); - callback.done(true); - return true; + if (skipDuplicate) { + // if we should skip duplicate then we are done + LOG.debug("Ignoring duplicate message with id: {} for exchange: {}", messageId, exchange); + callback.done(true); + return true; + } } - } - final Synchronization onCompletion = new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure); - final AsyncCallback target = new IdempotentConsumerCallback(exchange, onCompletion, callback, completionEager); - if (!completionEager) { - // the scope is to do the idempotent completion work as an unit of work on the exchange when its done being routed - exchange.addOnCompletion(onCompletion); + final Synchronization onCompletion = new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure); + target = new IdempotentConsumerCallback(exchange, onCompletion, callback, completionEager); + if (!completionEager) { + // the scope is to do the idempotent completion work as an unit of work on the exchange when its done being routed + exchange.addOnCompletion(onCompletion); + } + } catch (Exception e) { + exchange.setException(e); + callback.done(true); + return true; } // process the exchange http://git-wip-us.apache.org/repos/asf/camel/blob/0ead2cac/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java new file mode 100644 index 0000000..1cdfe26 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerRepoExceptionTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; +import org.apache.camel.spi.IdempotentRepository; + +/** + * @version + */ +public class IdempotentConsumerRepoExceptionTest extends ContextTestSupport { + + private IdempotentRepository myRepo = new MyRepo(); + + public void testRepoException() throws Exception { + getMockEndpoint("mock:dead").expectedBodiesReceived("nineninenine"); + getMockEndpoint("mock:result").expectedBodiesReceived("one", "two", "three"); + + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "nineninenine", "messageId", "999"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "three", "messageId", "3"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .idempotentConsumer(header("messageId"), myRepo) + .to("mock:result"); + + } + }; + } + + private class MyRepo extends MemoryIdempotentRepository { + @Override + public boolean add(String key) { + if ("999".equals(key)) { + throw new IllegalArgumentException("Forced"); + } + return super.add(key); + } + } +}