Repository: camel Updated Branches: refs/heads/camel-2.16.x e34882f58 -> 584725f48 refs/heads/master 6b2a7b12a -> 895c938ce
CAMEL-9346: camel-sql - Add transacted option Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/895c938c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/895c938c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/895c938c Branch: refs/heads/master Commit: 895c938ced4f7c0f5a36683d0ccc159f5d6fa62d Parents: 6b2a7b1 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Nov 21 12:00:14 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Nov 21 12:00:14 2015 +0100 ---------------------------------------------------------------------- .../camel/component/sql/DefaultSqlEndpoint.java | 15 +++++++++++++++ .../org/apache/camel/component/sql/SqlConsumer.java | 11 +++++++++++ 2 files changed, 26 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/895c938c/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java index 2de1d64..0f7d4ce 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java @@ -43,6 +43,9 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint { private String dataSourceRef; @UriParam(description = "Sets the DataSource to use to communicate with the database.") private DataSource dataSource; + @UriParam(label = "consumer", description = "Enables or disables transaction. If enabled then if processing an exchange failed then the consumer" + + "break out processing any further exchanges to cause a rollback eager.") + private boolean transacted; @UriParam(label = "producer", description = "Enables or disables batch mode") private boolean batch; @UriParam(label = "consumer", description = "Sets the maximum number of messages to poll") @@ -125,6 +128,18 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint { this.jdbcTemplate = jdbcTemplate; } + public boolean isTransacted() { + return transacted; + } + + /** + * Enables or disables transaction. If enabled then if processing an exchange failed then the consumer + + break out processing any further exchanges to cause a rollback eager + */ + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + public boolean isBatch() { return batch; } http://git-wip-us.apache.org/repos/asf/camel/blob/895c938c/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java index ce8135a..40e0eb9 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java @@ -27,6 +27,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.RollbackExchangeException; import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -207,6 +208,16 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { exchange.setException(e); } + if (getEndpoint().isTransacted() && exchange.isFailed()) { + // break out as we are transacted and should rollback + Exception cause = exchange.getException(); + if (cause != null) { + throw cause; + } else { + throw new RollbackExchangeException("Rollback transaction due error processing exchange", exchange); + } + } + // pick the on consume to use String sql = exchange.isFailed() ? onConsumeFailed : onConsume; try {