CAMEL-9346: camel-sql - Add transacted option
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/584725f4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/584725f4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/584725f4 Branch: refs/heads/camel-2.16.x Commit: 584725f481eb137e4e4a6e1cde8f80cc7f232115 Parents: e34882f Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Nov 21 12:00:14 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Nov 21 12:00:42 2015 +0100 ---------------------------------------------------------------------- .../camel/component/sql/DefaultSqlEndpoint.java | 15 +++++++++++++++ .../org/apache/camel/component/sql/SqlConsumer.java | 11 +++++++++++ 2 files changed, 26 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/584725f4/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java index 2de1d64..0f7d4ce 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java @@ -43,6 +43,9 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint { private String dataSourceRef; @UriParam(description = "Sets the DataSource to use to communicate with the database.") private DataSource dataSource; + @UriParam(label = "consumer", description = "Enables or disables transaction. If enabled then if processing an exchange failed then the consumer" + + "break out processing any further exchanges to cause a rollback eager.") + private boolean transacted; @UriParam(label = "producer", description = "Enables or disables batch mode") private boolean batch; @UriParam(label = "consumer", description = "Sets the maximum number of messages to poll") @@ -125,6 +128,18 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint { this.jdbcTemplate = jdbcTemplate; } + public boolean isTransacted() { + return transacted; + } + + /** + * Enables or disables transaction. If enabled then if processing an exchange failed then the consumer + + break out processing any further exchanges to cause a rollback eager + */ + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + public boolean isBatch() { return batch; } http://git-wip-us.apache.org/repos/asf/camel/blob/584725f4/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java index ce8135a..40e0eb9 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java +++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java @@ -27,6 +27,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.RollbackExchangeException; import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -207,6 +208,16 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer { exchange.setException(e); } + if (getEndpoint().isTransacted() && exchange.isFailed()) { + // break out as we are transacted and should rollback + Exception cause = exchange.getException(); + if (cause != null) { + throw cause; + } else { + throw new RollbackExchangeException("Rollback transaction due error processing exchange", exchange); + } + } + // pick the on consume to use String sql = exchange.isFailed() ? onConsumeFailed : onConsume; try {