Repository: camel Updated Branches: refs/heads/camel-2.16.x 584725f48 -> 142bd2441 refs/heads/master 895c938ce -> e1d368338
CAMEL-9346: camel-mybatis - Add transacted option Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e1d36833 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e1d36833 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e1d36833 Branch: refs/heads/master Commit: e1d368338a46dc6a266decb250111d52f92486c9 Parents: 895c938 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Nov 21 12:04:00 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Nov 21 12:04:00 2015 +0100 ---------------------------------------------------------------------- .../camel/component/mybatis/MyBatisConsumer.java | 11 +++++++++++ .../camel/component/mybatis/MyBatisEndpoint.java | 15 +++++++++++++++ 2 files changed, 26 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e1d36833/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java index a1a41ea..0f2fbd9 100644 --- a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java +++ b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java @@ -24,6 +24,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.RollbackExchangeException; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; @@ -146,6 +147,16 @@ public class MyBatisConsumer extends ScheduledBatchPollingConsumer { } catch (Exception e) { handleException(e); } + + if (getEndpoint().isTransacted() && exchange.isFailed()) { + // break out as we are transacted and should rollback + Exception cause = exchange.getException(); + if (cause != null) { + throw cause; + } else { + throw new RollbackExchangeException("Rollback transaction due error processing exchange", exchange); + } + } } return total; http://git-wip-us.apache.org/repos/asf/camel/blob/e1d36833/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java index e39ddb2..ce460c1 100644 --- a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java +++ b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java @@ -42,6 +42,9 @@ public class MyBatisEndpoint extends DefaultPollingEndpoint { private String statement; @UriParam(label = "producer") private StatementType statementType; + @UriParam(label = "consumer", description = "Enables or disables transaction. If enabled then if processing an exchange failed then the consumer" + + "break out processing any further exchanges to cause a rollback eager.") + private boolean transacted; @UriParam(label = "consumer", defaultValue = "0") private int maxMessagesPerPoll; @UriParam @@ -128,6 +131,18 @@ public class MyBatisEndpoint extends DefaultPollingEndpoint { this.executorType = ExecutorType.valueOf(executorType.toUpperCase()); } + public boolean isTransacted() { + return transacted; + } + + /** + * Enables or disables transaction. If enabled then if processing an exchange failed then the consumer + + break out processing any further exchanges to cause a rollback eager + */ + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + public MyBatisProcessingStrategy getProcessingStrategy() { return processingStrategy; }