CAMEL-8761: Idempotent Consumer EIP - Allow to commit when EIP scope ends
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92897a09 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92897a09 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92897a09 Branch: refs/heads/master Commit: 92897a09f69323038f485ea4f4207b3dee891258 Parents: b51bda4 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu May 14 11:36:19 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu May 14 13:28:54 2015 +0200 ---------------------------------------------------------------------- .../model/IdempotentConsumerDefinition.java | 66 ++++++++- .../camel/model/IdempotentConsumerScope.java | 31 ++++ .../idempotent/IdempotentConsumer.java | 54 ++++++- .../resources/org/apache/camel/model/jaxb.index | 1 + .../processor/IdempotentConsumerScopeTest.java | 140 +++++++++++++++++++ 5 files changed, 286 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java index 5c64e04..244bc50 100644 --- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java @@ -38,10 +38,13 @@ import org.apache.camel.util.ObjectHelper; @XmlRootElement(name = "idempotentConsumer") @XmlAccessorType(XmlAccessType.FIELD) public class IdempotentConsumerDefinition extends ExpressionNode { + @XmlAttribute(required = true) private String messageIdRepositoryRef; @XmlAttribute @Metadata(defaultValue = "true") private Boolean eager; + @XmlAttribute @Metadata(defaultValue = "OnCompletion") + private IdempotentConsumerScope scope; @XmlAttribute @Metadata(defaultValue = "true") private Boolean skipDuplicate; @XmlAttribute @Metadata(defaultValue = "true") @@ -106,6 +109,57 @@ public class IdempotentConsumerDefinition extends ExpressionNode { } /** + * Sets the scope of this idempotent consumer when its boundaries ends. + * <p/> + * The default mode is <tt>onCompletion</tt> which means the idempotent consumer will + * only trigger its completion at the end of the routing of the exchange, when the exchange completes. + * So if the exchange is continued routed after the block ends, then whatever happens there <b>also</b> affect the state. + * For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback. + * <p/> + * The other mode <tt>blockOnly</tt> means that the idempotent consumer will trigger its completion + * when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange + * is continued routed after the block ends, then whatever happens there does not affect the state. + * + * @param scope the scope to use + * @return builder + */ + public IdempotentConsumerDefinition scope(IdempotentConsumerScope scope) { + setScope(scope); + return this; + } + + /** + * Sets the scope of this idempotent consumer where its boundaries ends to <tt>blockOnly</tt>. + * <p/> + * The <tt>blockOnly</tt> mode means that the idempotent consumer will trigger its completion + * when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange + * is continued routed after the block ends, then whatever happens there does not affect the state. + * + * @see #scope(IdempotentConsumerScope) + * @return builder + */ + public IdempotentConsumerDefinition scopeBlockOnly() { + setScope(IdempotentConsumerScope.BlockOnly); + return this; + } + + /** + * Sets the scope of this idempotent consumer where its boundaries ends to <tt>onCompletion</tt>. + * <p/> + * The <tt>onCompletion</tt> mode means the idempotent consumer will + * only trigger its completion at the end of the routing of the exchange, when the exchange completes. + * So if the exchange is continued routed after the block ends, then whatever happens there <b>also</b> affect the state. + * For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback. + * + * @see #scope(IdempotentConsumerScope) + * @return builder + */ + public IdempotentConsumerDefinition scopeOnCompletion() { + setScope(IdempotentConsumerScope.OnCompletion); + return this; + } + + /** * Sets whether to remove or keep the key on failure. * <p/> * The default behavior is to remove the key on failure. @@ -185,6 +239,14 @@ public class IdempotentConsumerDefinition extends ExpressionNode { this.removeOnFailure = removeOnFailure; } + public IdempotentConsumerScope getScope() { + return scope; + } + + public void setScope(IdempotentConsumerScope scope) { + this.scope = scope; + } + @Override @SuppressWarnings("unchecked") public Processor createProcessor(RouteContext routeContext) throws Exception { @@ -203,8 +265,10 @@ public class IdempotentConsumerDefinition extends ExpressionNode { boolean eager = getEager() == null || getEager(); boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate(); boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure(); + // and this is not true by default + boolean scopeBlockOnly = getScope() != null && IdempotentConsumerScope.BlockOnly == getScope(); - return new IdempotentConsumer(expression, idempotentRepository, eager, duplicate, remove, childProcessor); + return new IdempotentConsumer(expression, idempotentRepository, eager, duplicate, remove, scopeBlockOnly, childProcessor); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerScope.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerScope.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerScope.java new file mode 100644 index 0000000..b42f28b --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerScope.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlEnum; +import javax.xml.bind.annotation.XmlType; + +/** + * Represents the scopes supported by the idempotent consumer EIP + */ +@XmlType +@XmlEnum +public enum IdempotentConsumerScope { + + BlockOnly, OnCompletion + +} http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java index 67d20d5..6b73e63 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java @@ -29,6 +29,7 @@ import org.apache.camel.Processor; import org.apache.camel.spi.ExchangeIdempotentRepository; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.IdempotentRepository; +import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; @@ -58,15 +59,17 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor private final boolean eager; private final boolean skipDuplicate; private final boolean removeOnFailure; + private final boolean scopeBlockOnly; private final AtomicLong duplicateMessageCount = new AtomicLong(); public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository<String> idempotentRepository, - boolean eager, boolean skipDuplicate, boolean removeOnFailure, Processor processor) { + boolean eager, boolean skipDuplicate, boolean removeOnFailure, boolean scopeBlockOnly, Processor processor) { this.messageIdExpression = messageIdExpression; this.idempotentRepository = idempotentRepository; this.eager = eager; this.skipDuplicate = skipDuplicate; this.removeOnFailure = removeOnFailure; + this.scopeBlockOnly = scopeBlockOnly; this.processor = AsyncProcessorConverterHelper.convert(processor); } @@ -87,7 +90,7 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor AsyncProcessorHelper.process(this, exchange); } - public boolean process(Exchange exchange, AsyncCallback callback) { + public boolean process(final Exchange exchange, final AsyncCallback callback) { final String messageId = messageIdExpression.evaluate(exchange, String.class); if (messageId == null) { exchange.setException(new NoMessageIdException(exchange, messageIdExpression)); @@ -128,11 +131,15 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor } } - // register our on completion callback - exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure)); + final Synchronization onCompletion = new IdempotentOnCompletion(idempotentRepository, messageId, eager, removeOnFailure); + final AsyncCallback target = new IdempotentConsumerCallback(exchange, onCompletion, callback, scopeBlockOnly); + if (!scopeBlockOnly) { + // the scope is to do the idempotent completion work as an unit of work on the exchange when its done being routed + exchange.addOnCompletion(onCompletion); + } // process the exchange - return processor.process(exchange, callback); + return processor.process(exchange, target); } public List<Processor> next() { @@ -201,4 +208,41 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor // noop } + /** + * {@link org.apache.camel.AsyncCallback} that is invoked when the idempotent consumer block ends + */ + private static class IdempotentConsumerCallback implements AsyncCallback { + private final Exchange exchange; + private final Synchronization onCompletion; + private final AsyncCallback callback; + private final boolean scopeBlockOnly; + + public IdempotentConsumerCallback(Exchange exchange, Synchronization onCompletion, AsyncCallback callback, boolean scopeBlockOnly) { + this.exchange = exchange; + this.onCompletion = onCompletion; + this.callback = callback; + this.scopeBlockOnly = scopeBlockOnly; + } + + @Override + public void done(boolean doneSync) { + try { + if (scopeBlockOnly) { + if (exchange.isFailed()) { + onCompletion.onFailure(exchange); + } else { + onCompletion.onComplete(exchange); + } + } + // if scope is not block only then the onCompletion is invoked as part of the UoW of the Exchange + } finally { + callback.done(doneSync); + } + } + + @Override + public String toString() { + return "IdempotentConsumerCallback"; + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/main/resources/org/apache/camel/model/jaxb.index ---------------------------------------------------------------------- diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index index fc79da2..5e6a3c0 100644 --- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index +++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index @@ -31,6 +31,7 @@ FilterDefinition FinallyDefinition FromDefinition IdempotentConsumerDefinition +IdempotentConsumerScope InOnlyDefinition InOutDefinition InterceptDefinition http://git-wip-us.apache.org/repos/asf/camel/blob/92897a09/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerScopeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerScopeTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerScopeTest.java new file mode 100644 index 0000000..21b727a --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerScopeTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; +import org.apache.camel.spi.IdempotentRepository; + +/** + * @version + */ +public class IdempotentConsumerScopeTest extends ContextTestSupport { + protected Endpoint startEndpoint; + protected MockEndpoint resultEndpoint; + protected MockEndpoint a; + protected MockEndpoint b; + protected MockEndpoint dead; + protected IdempotentRepository<String> repo; + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testScopeBlockOnly() throws Exception { + repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .idempotentConsumer(header("messageId"), repo).scopeBlockOnly() + .to("log:a", "mock:a") + .to("log:b", "mock:b") + .end() + .filter(simple("${header.messageId} == '2'")) + .throwException(new IllegalArgumentException("Forced")) + .end() + .to("log:result", "mock:result"); + } + }); + context.start(); + + // we are on block only scope as "two" was success in the block, and then "two" failed afterwards does not matter + // the idempotent consumer will not receive "two" again + a.expectedBodiesReceived("one", "two", "three"); + b.expectedBodiesReceived("one", "two", "three"); + dead.expectedBodiesReceived("two", "two"); + resultEndpoint.expectedBodiesReceived("one", "one", "one", "three"); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + + public void testScopeOnCompletion() throws Exception { + repo = MemoryIdempotentRepository.memoryIdempotentRepository(200); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .idempotentConsumer(header("messageId"), repo).scopeOnCompletion() + .to("log:a", "mock:a") + .to("log:b", "mock:b") + .end() + .filter(simple("${header.messageId} == '2'")) + .throwException(new IllegalArgumentException("Forced")) + .end() + .to("log:result", "mock:result"); + } + }); + context.start(); + + // we are on completion scope so the "two" will rollback and therefore the idempotent consumer receives those again + a.expectedBodiesReceived("one", "two", "two", "three"); + b.expectedBodiesReceived("one", "two", "two", "three"); + dead.expectedBodiesReceived("two", "two"); + resultEndpoint.expectedBodiesReceived("one", "one", "one", "three"); + + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("2", "two"); + sendMessage("1", "one"); + sendMessage("3", "three"); + + assertMockEndpointsSatisfied(); + } + + protected void sendMessage(final Object messageId, final Object body) { + template.send(startEndpoint, new Processor() { + public void process(Exchange exchange) { + // now lets fire in a message + Message in = exchange.getIn(); + in.setBody(body); + in.setHeader("messageId", messageId); + } + }); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + startEndpoint = resolveMandatoryEndpoint("direct:start"); + resultEndpoint = getMockEndpoint("mock:result"); + a = getMockEndpoint("mock:a"); + b = getMockEndpoint("mock:b"); + dead = getMockEndpoint("mock:dead"); + } + +} \ No newline at end of file