CAMEL-10685: TransactionErrorHandler and TransactionPolicy for Camel CDI / JavaEE
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8092e89f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8092e89f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8092e89f Branch: refs/heads/master Commit: 8092e89f63515889bbd26457d1380b5c6cf68eb1 Parents: cc4ccd4 Author: Stephan Pelikan <stephan.peli...@wdw-elab.de> Authored: Thu Apr 13 11:11:52 2017 +0200 Committer: Antonin Stefanutti <anto...@stefanutti.fr> Committed: Thu Apr 13 16:52:19 2017 +0200 ---------------------------------------------------------------------- components/camel-cdi/pom.xml | 8 + .../apache/camel/cdi/CdiCamelConfiguration.java | 13 + .../camel/cdi/CdiCamelConfigurationEvent.java | 17 +- .../org/apache/camel/cdi/CdiCamelExtension.java | 22 +- .../org/apache/camel/cdi/JtaRouteBuilder.java | 24 ++ .../cdi/jta/JtaTransactionErrorHandler.java | 48 +++ .../jta/JtaTransactionErrorHandlerBuilder.java | 164 ++++++++ .../camel/cdi/jta/JtaTransactionPolicy.java | 136 +++++++ .../cdi/jta/MandatoryJtaTransactionPolicy.java | 18 + .../cdi/jta/NestedJtaTransactionPolicy.java | 44 +++ .../cdi/jta/NeverJtaTransactionPolicy.java | 18 + .../jta/NotSupportedJtaTransactionPolicy.java | 24 ++ .../cdi/jta/RequiredJtaTransactionPolicy.java | 15 + .../jta/RequiresNewJtaTransactionPolicy.java | 24 ++ .../cdi/jta/SupportsJtaTransactionPolicy.java | 15 + .../camel/cdi/jta/TransactedDefinition.java | 18 + .../camel/cdi/jta/TransactionErrorHandler.java | 370 +++++++++++++++++++ .../jta/TransactionalJtaTransactionPolicy.java | 121 ++++++ parent/pom.xml | 1 + 19 files changed, 1088 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-cdi/pom.xml b/components/camel-cdi/pom.xml index c54521e..9fb19d7 100644 --- a/components/camel-cdi/pom.xml +++ b/components/camel-cdi/pom.xml @@ -115,6 +115,14 @@ <optional>true</optional> </dependency> + <dependency> + <groupId>javax.transaction</groupId> + <artifactId>javax.transaction-api</artifactId> + <version>${jta-api-1.2-version}</version> + <scope>provided</scope> + <optional>true</optional> + </dependency> + <!-- test dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java index 4ea53c6..d14ccf0 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfiguration.java @@ -38,4 +38,17 @@ public interface CdiCamelConfiguration { * @return Current state of autoConfigureRoutes parameter. */ boolean autoConfigureRoutes(); + + /** + * Overrides the Camel CDI behavior to automatically start all Camel contexts. + * @return this Camel CDI configuration + * @throws IllegalStateException if called outside of the observer method invocation + */ + CdiCamelConfiguration autoStartContexts(boolean autoStartContexts); + + /** + * @return Current state of autoStartContexts parameter. + */ + boolean autoStartContexts(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java index bf96ea0..2f1c7fc 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelConfigurationEvent.java @@ -19,6 +19,7 @@ package org.apache.camel.cdi; /* package-private */ final class CdiCamelConfigurationEvent implements CdiCamelConfiguration { private boolean autoConfigureRoutes = true; + private boolean autoStartContexts = true; private volatile boolean unmodifiable; @Override @@ -33,14 +34,26 @@ package org.apache.camel.cdi; return autoConfigureRoutes; } + @Override + public CdiCamelConfiguration autoStartContexts(boolean autoStartContexts) { + throwsIfUnmodifiable(); + this.autoStartContexts = autoStartContexts; + return this; + } + + @Override + public boolean autoStartContexts() { + return autoStartContexts; + } + void unmodifiable() { unmodifiable = true; } private void throwsIfUnmodifiable() { if (unmodifiable) { - throw new IllegalStateException("Camel CDI configuration event must not be used outside " - + "its observer method!"); + throw new IllegalStateException( + "Camel CDI configuration event must not be used outside " + "its observer method!"); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java index 54a5b9a..8862476 100755 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java @@ -415,16 +415,18 @@ public class CdiCamelExtension implements Extension { .forEach(bean -> getReference(manager, bean.getBeanClass(), bean).toString()); // Start Camel contexts - for (CamelContext context : contexts) { - if (ServiceStatus.Started.equals(context.getStatus())) { - continue; - } - logger.info("Camel CDI is starting Camel context [{}]", context.getName()); - try { - context.start(); - } catch (Exception exception) { - adv.addDeploymentProblem(exception); - } + if (configuration.autoStartContexts()) { + for (CamelContext context : contexts) { + if (ServiceStatus.Started.equals(context.getStatus())) { + continue; + } + logger.info("Camel CDI is starting Camel context [{}]", context.getName()); + try { + context.start(); + } catch (Exception exception) { + adv.addDeploymentProblem(exception); + } + } } // Clean-up http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java new file mode 100644 index 0000000..838a8a1 --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/JtaRouteBuilder.java @@ -0,0 +1,24 @@ +package org.apache.camel.cdi; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.cdi.jta.JtaTransactionErrorHandlerBuilder; + +/** + * An extension of the {@link RouteBuilder} to provide some additional helper + * methods + * + * @version + */ +public abstract class JtaRouteBuilder extends RouteBuilder { + + /** + * Creates a transaction error handler that will lookup in application + * context for an exiting transaction manager. + * + * @return the created error handler + */ + public JtaTransactionErrorHandlerBuilder transactionErrorHandler() { + return new JtaTransactionErrorHandlerBuilder(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java new file mode 100644 index 0000000..8a7f0d2 --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandler.java @@ -0,0 +1,48 @@ +package org.apache.camel.cdi.jta; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.CamelContext; +import org.apache.camel.LoggingLevel; +import org.apache.camel.Predicate; +import org.apache.camel.Processor; +import org.apache.camel.processor.RedeliveryPolicy; +import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; +import org.apache.camel.util.CamelLogger; + +/** + * This error handler does redelivering. If the transaction fails it can be + * retried if configured to do so. In the Spring implementation redelivering is + * done within the transaction which is not appropriate in JTA since every error + * breaks the current transaction. + */ +public class JtaTransactionErrorHandler extends org.apache.camel.processor.RedeliveryErrorHandler { + + public JtaTransactionErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, + Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, + ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy, + Predicate retryWhile, ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel, + Processor onExceptionOccurredProcessor) { + + super(camelContext, + new TransactionErrorHandler(camelContext, + output, + exceptionPolicyStrategy, + transactionPolicy, + executorService, + rollbackLoggingLevel), + logger, + redeliveryProcessor, + redeliveryPolicy, + null, + null, + false, + false, + retryWhile, + executorService, + null, + onExceptionOccurredProcessor); + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java new file mode 100644 index 0000000..6977e9d --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionErrorHandlerBuilder.java @@ -0,0 +1,164 @@ +package org.apache.camel.cdi.jta; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.LoggingLevel; +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.builder.DefaultErrorHandlerBuilder; +import org.apache.camel.builder.ErrorHandlerBuilder; +import org.apache.camel.spi.Policy; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.TransactedPolicy; +import org.apache.camel.util.CamelLogger; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Builds transactional error handlers. This class is based on + * {@link org.apache.camel.spring.spi.TransactionErrorHandlerBuilder}. + */ +public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerBuilder.class); + + private static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED"; + + public static final String ROLLBACK_LOGGING_LEVEL_PROPERTY = JtaTransactionErrorHandlerBuilder.class.getName() + + "#rollbackLoggingLevel"; + + private LoggingLevel rollbackLoggingLevel = LoggingLevel.WARN; + + private JtaTransactionPolicy transactionPolicy; + + private String policyRef; + + @Override + public boolean supportTransacted() { + return true; + } + + @Override + public ErrorHandlerBuilder cloneBuilder() { + + final JtaTransactionErrorHandlerBuilder answer = new JtaTransactionErrorHandlerBuilder(); + cloneBuilder(answer); + return answer; + + } + + @Override + protected void cloneBuilder(DefaultErrorHandlerBuilder other) { + + super.cloneBuilder(other); + if (other instanceof JtaTransactionErrorHandlerBuilder) { + final JtaTransactionErrorHandlerBuilder otherTx = (JtaTransactionErrorHandlerBuilder) other; + transactionPolicy = otherTx.transactionPolicy; + rollbackLoggingLevel = otherTx.rollbackLoggingLevel; + } + + } + + public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception { + + // resolve policy reference, if given + if (transactionPolicy == null) { + + if (policyRef != null) { + + final TransactedDefinition transactedDefinition = new TransactedDefinition(); + transactedDefinition.setRef(policyRef); + final Policy policy = transactedDefinition.resolvePolicy(routeContext); + if (policy != null) { + if (!(policy instanceof JtaTransactionPolicy)) { + throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '" + + policyRef.getClass().getName() + "' but an instance of '" + + JtaTransactionPolicy.class.getName() + "' is required!"); + } + transactionPolicy = (JtaTransactionPolicy) policy; + } + + } + + } + + // try to lookup default policy + if (transactionPolicy == null) { + + LOG.debug( + "No tranaction policiy configured on TransactionErrorHandlerBuilder. Will try find it in the registry."); + + Map<String, TransactedPolicy> mapPolicy = routeContext.lookupByType(TransactedPolicy.class); + if (mapPolicy != null && mapPolicy.size() == 1) { + TransactedPolicy policy = mapPolicy.values().iterator().next(); + if (policy != null && policy instanceof JtaTransactionPolicy) { + transactionPolicy = ((JtaTransactionPolicy) policy); + } + } + + if (transactionPolicy == null) { + TransactedPolicy policy = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class); + if (policy != null && policy instanceof JtaTransactionPolicy) { + transactionPolicy = ((JtaTransactionPolicy) policy); + } + } + + if (transactionPolicy != null) { + LOG.debug("Found TransactionPolicy in registry to use: " + transactionPolicy); + } + + } + + ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this); + + final CamelContext camelContext = routeContext.getCamelContext(); + final Map<String, String> properties = camelContext.getProperties(); + if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) { + rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY)); + } + + JtaTransactionErrorHandler answer = new JtaTransactionErrorHandler(camelContext, + processor, + getLogger(), + getOnRedelivery(), + getRedeliveryPolicy(), + getExceptionPolicyStrategy(), + transactionPolicy, + getRetryWhilePolicy(camelContext), + getExecutorService(camelContext), + rollbackLoggingLevel, + getOnExceptionOccurred()); + + // configure error handler before we can use it + configure(routeContext, answer); + return answer; + + } + + public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final String ref) { + policyRef = ref; + return this; + } + + public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final JtaTransactionPolicy transactionPolicy) { + this.transactionPolicy = transactionPolicy; + return this; + } + + public JtaTransactionErrorHandlerBuilder setRollbackLoggingLevel(final LoggingLevel rollbackLoggingLevel) { + this.rollbackLoggingLevel = rollbackLoggingLevel; + return this; + } + + protected CamelLogger createLogger() { + return new CamelLogger(LoggerFactory.getLogger(TransactionErrorHandler.class), LoggingLevel.ERROR); + } + + @Override + public String toString() { + return "JtaTransactionErrorHandlerBuilder"; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java new file mode 100644 index 0000000..c4c70c2 --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/JtaTransactionPolicy.java @@ -0,0 +1,136 @@ +package org.apache.camel.cdi.jta; + +import javax.annotation.Resource; +import javax.transaction.TransactionManager; + +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.builder.ErrorHandlerBuilder; +import org.apache.camel.builder.ErrorHandlerBuilderRef; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.TransactedPolicy; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sets a proper error handler. This class is based on + * {@link org.apache.camel.spring.spi.SpringTransactionPolicy}. + * <p> + * This class requires the resource {@link TransactionManager} to be available + * through JNDI url "java:/TransactionManager" + */ +public abstract class JtaTransactionPolicy implements TransactedPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionPolicy.class); + + public static interface Runnable { + void run() throws Throwable; + } + + @Resource(lookup = "java:/TransactionManager") + protected TransactionManager transactionManager; + + @Override + public void beforeWrap(RouteContext routeContext, ProcessorDefinition<?> definition) { + // do not inherit since we create our own + // (otherwise the default error handler would be used two times + // because we inherit it on our own but only in case of a + // non-transactional + // error handler) + definition.setInheritErrorHandler(false); + } + + public abstract void run(final Runnable runnable) throws Throwable; + + @Override + public Processor wrap(RouteContext routeContext, Processor processor) { + + JtaTransactionErrorHandler answer; + + // the goal is to configure the error handler builder on the route as a + // transacted error handler, + // either its already a transacted or if not we replace it with a + // transacted one that we configure here + // and wrap the processor in the transacted error handler as we can have + // transacted routes that change + // propagation behavior, eg: from A required -> B -> requiresNew C + // (advanced use-case) + // if we should not support this we do not need to wrap the processor as + // we only need one transacted error handler + + // find the existing error handler builder + ErrorHandlerBuilder builder = (ErrorHandlerBuilder) routeContext.getRoute().getErrorHandlerBuilder(); + + // check if its a ref if so then do a lookup + if (builder instanceof ErrorHandlerBuilderRef) { + // its a reference to a error handler so lookup the reference + ErrorHandlerBuilderRef builderRef = (ErrorHandlerBuilderRef) builder; + String ref = builderRef.getRef(); + // only lookup if there was explicit an error handler builder + // configured + // otherwise its just the "default" that has not explicit been + // configured + // and if so then we can safely replace that with our transacted + // error handler + if (ErrorHandlerBuilderRef.isErrorHandlerBuilderConfigured(ref)) { + LOG.debug("Looking up ErrorHandlerBuilder with ref: {}", ref); + builder = (ErrorHandlerBuilder) ErrorHandlerBuilderRef.lookupErrorHandlerBuilder(routeContext, ref); + } + } + + JtaTransactionErrorHandlerBuilder txBuilder; + if ((builder != null) && builder.supportTransacted()) { + if (!(builder instanceof JtaTransactionErrorHandlerBuilder)) { + throw new RuntimeCamelException("The given transactional error handler builder '" + builder + + "' is not of type '" + JtaTransactionErrorHandlerBuilder.class.getName() + + "' which is required in this environment!"); + } + LOG.debug("The ErrorHandlerBuilder configured is a JtaTransactionErrorHandlerBuilder: {}", builder); + txBuilder = (JtaTransactionErrorHandlerBuilder) builder.cloneBuilder(); + } else { + LOG.debug( + "No or no transactional ErrorHandlerBuilder configured, will use default JtaTransactionErrorHandlerBuilder settings"); + txBuilder = new JtaTransactionErrorHandlerBuilder(); + } + + txBuilder.setTransactionPolicy(this); + + // use error handlers from the configured builder + if (builder != null) { + txBuilder.setErrorHandlers(routeContext, builder.getErrorHandlers(routeContext)); + } + + answer = createTransactionErrorHandler(routeContext, processor, txBuilder); + answer.setExceptionPolicy(txBuilder.getExceptionPolicyStrategy()); + // configure our answer based on the existing error handler + txBuilder.configure(routeContext, answer); + + // set the route to use our transacted error handler builder + routeContext.getRoute().setErrorHandlerBuilder(txBuilder); + + // return with wrapped transacted error handler + return answer; + + } + + protected JtaTransactionErrorHandler createTransactionErrorHandler(RouteContext routeContext, Processor processor, + ErrorHandlerBuilder builder) { + + JtaTransactionErrorHandler answer; + try { + answer = (JtaTransactionErrorHandler) builder.createErrorHandler(routeContext, processor); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + return answer; + + } + + @Override + public String toString() { + return getClass().getName(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java new file mode 100644 index 0000000..260ad69 --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/MandatoryJtaTransactionPolicy.java @@ -0,0 +1,18 @@ +package org.apache.camel.cdi.jta; + +import javax.inject.Named; + +@Named("PROPAGATION_MANDATORY") +public class MandatoryJtaTransactionPolicy extends TransactionalJtaTransactionPolicy { + + @Override + public void run(final Runnable runnable) throws Exception { + + if (!hasActiveTransaction()) { + throw new IllegalStateException( + "Policy 'PROPAGATION_MANDATORY' is configured but no active transaction was found!"); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java new file mode 100644 index 0000000..6ce116a --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NestedJtaTransactionPolicy.java @@ -0,0 +1,44 @@ +package org.apache.camel.cdi.jta; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.inject.Named; +import javax.transaction.Transaction; + +@Named("PROPAGATION_NESTED") +public class NestedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy { + + private static final Logger logger = Logger.getLogger(NestedJtaTransactionPolicy.class.getCanonicalName()); + + @Override + public void run(final Runnable runnable) throws Throwable { + + Transaction suspendedTransaction = null; + boolean rollback = false; + try { + + suspendedTransaction = suspendTransaction(); + runWithTransaction(runnable, true); + + } catch (Throwable e) { + rollback = true; + throw e; + } finally { + try { + if (rollback) { + rollback(false); + } + } catch (Exception e) { + logger.log(Level.WARNING, "Could not do rollback of outer transaction", e); + } + try { + resumeTransaction(suspendedTransaction); + } catch (Exception e) { + logger.log(Level.WARNING, "Could not resume outer transaction", e); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java new file mode 100644 index 0000000..377c856 --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NeverJtaTransactionPolicy.java @@ -0,0 +1,18 @@ +package org.apache.camel.cdi.jta; + +import javax.inject.Named; + +@Named("PROPAGATION_NEVER") +public class NeverJtaTransactionPolicy extends TransactionalJtaTransactionPolicy { + + @Override + public void run(final Runnable runnable) throws Exception { + + if (hasActiveTransaction()) { + throw new IllegalStateException( + "Policy 'PROPAGATION_NEVER' is configured but an active transaction was found!"); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java new file mode 100644 index 0000000..c3c6bfc --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/NotSupportedJtaTransactionPolicy.java @@ -0,0 +1,24 @@ +package org.apache.camel.cdi.jta; + +import javax.inject.Named; +import javax.transaction.Transaction; + +@Named("PROPAGATION_NOT_SUPPORTED") +public class NotSupportedJtaTransactionPolicy extends TransactionalJtaTransactionPolicy { + + @Override + public void run(final Runnable runnable) throws Throwable { + + Transaction suspendedTransaction = null; + try { + + suspendedTransaction = suspendTransaction(); + runnable.run(); + + } finally { + resumeTransaction(suspendedTransaction); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java new file mode 100644 index 0000000..b40dd80 --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiredJtaTransactionPolicy.java @@ -0,0 +1,15 @@ +package org.apache.camel.cdi.jta; + +import javax.inject.Named; + +@Named("PROPAGATION_REQUIRED") +public class RequiredJtaTransactionPolicy extends TransactionalJtaTransactionPolicy { + + @Override + public void run(final Runnable runnable) throws Throwable { + + runWithTransaction(runnable, !hasActiveTransaction()); + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java new file mode 100644 index 0000000..4b1fa47 --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/RequiresNewJtaTransactionPolicy.java @@ -0,0 +1,24 @@ +package org.apache.camel.cdi.jta; + +import javax.inject.Named; +import javax.transaction.Transaction; + +@Named("PROPAGATION_REQUIRES_NEW") +public class RequiresNewJtaTransactionPolicy extends TransactionalJtaTransactionPolicy { + + @Override + public void run(final Runnable runnable) throws Throwable { + + Transaction suspendedTransaction = null; + try { + + suspendedTransaction = suspendTransaction(); + runWithTransaction(runnable, true); + + } finally { + resumeTransaction(suspendedTransaction); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java new file mode 100644 index 0000000..28ba016 --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/SupportsJtaTransactionPolicy.java @@ -0,0 +1,15 @@ +package org.apache.camel.cdi.jta; + +import javax.inject.Named; + +@Named("PROPAGATION_SUPPORTS") +public class SupportsJtaTransactionPolicy extends TransactionalJtaTransactionPolicy { + + @Override + public void run(final Runnable runnable) throws Throwable { + + runnable.run(); + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java new file mode 100644 index 0000000..9d01cbe --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactedDefinition.java @@ -0,0 +1,18 @@ +package org.apache.camel.cdi.jta; + +import org.apache.camel.spi.Policy; +import org.apache.camel.spi.RouteContext; + +/** + * Used to expose the method 'resolvePolicy' used by + * {@link JtaTransactionErrorHandlerBuilder} to resolve configured policy + * references. + */ +public class TransactedDefinition extends org.apache.camel.model.TransactedDefinition { + + @Override + public Policy resolvePolicy(RouteContext routeContext) { + return super.resolvePolicy(routeContext); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java new file mode 100644 index 0000000..651074e --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionErrorHandler.java @@ -0,0 +1,370 @@ +package org.apache.camel.cdi.jta; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +import javax.transaction.TransactionRolledbackException; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.Navigate; +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.processor.ErrorHandlerSupport; +import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; +import org.apache.camel.spi.ShutdownPrepared; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; + +/** + * Does transactional execution according given policy. This class is based on + * {@link org.apache.camel.spring.spi.TransactionErrorHandler} excluding + * redelivery functionality. In the Spring implementation redelivering is done + * within the transaction which is not appropriate in JTA since every error + * breaks the current transaction. + */ +public class TransactionErrorHandler extends ErrorHandlerSupport + implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> { + + protected final Processor output; + protected volatile boolean preparingShutdown; + + private ExceptionPolicyStrategy exceptionPolicy; + + private JtaTransactionPolicy transactionPolicy; + + private final String transactionKey; + + private final LoggingLevel rollbackLoggingLevel; + + /** + * Creates the transaction error handler. + * + * @param camelContext + * the camel context + * @param output + * outer processor that should use this default error handler + * @param exceptionPolicyStrategy + * strategy for onException handling + * @param transactionPolicy + * the transaction policy + * @param executorService + * the {@link java.util.concurrent.ScheduledExecutorService} to + * be used for redelivery thread pool. Can be <tt>null</tt>. + * @param rollbackLoggingLevel + * logging level to use for logging transaction rollback occurred + */ + public TransactionErrorHandler(CamelContext camelContext, Processor output, + ExceptionPolicyStrategy exceptionPolicyStrategy, JtaTransactionPolicy transactionPolicy, + ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel) { + + this.output = output; + this.transactionPolicy = transactionPolicy; + this.rollbackLoggingLevel = rollbackLoggingLevel; + this.transactionKey = ObjectHelper.getIdentityHashCode(transactionPolicy); + + setExceptionPolicy(exceptionPolicyStrategy); + + } + + public void process(Exchange exchange) throws Exception { + + // we have to run this synchronously as a JTA Transaction does *not* + // support using multiple threads to span a transaction + if (exchange.getUnitOfWork().isTransactedBy(transactionKey)) { + // already transacted by this transaction template + // so lets just let the error handler process it + processByErrorHandler(exchange); + } else { + // not yet wrapped in transaction so lets do that + // and then have it invoke the error handler from within that + // transaction + processInTransaction(exchange); + } + + } + + public boolean process(Exchange exchange, AsyncCallback callback) { + + // invoke this synchronous method as JTA Transaction does *not* + // support using multiple threads to span a transaction + try { + process(exchange); + } catch (Throwable e) { + exchange.setException(e); + } + + // notify callback we are done synchronously + callback.done(true); + return true; + + } + + protected void processInTransaction(final Exchange exchange) throws Exception { + // is the exchange redelivered, for example JMS brokers support such + // details + Boolean externalRedelivered = exchange.isExternalRedelivered(); + final String redelivered = externalRedelivered != null ? externalRedelivered.toString() : "unknown"; + final String ids = ExchangeHelper.logIds(exchange); + + try { + // mark the beginning of this transaction boundary + exchange.getUnitOfWork().beginTransactedBy(transactionKey); + + // do in transaction + logTransactionBegin(redelivered, ids); + doInTransactionTemplate(exchange); + logTransactionCommit(redelivered, ids); + + } catch (TransactionRolledbackException e) { + // do not set as exception, as its just a dummy exception to force + // spring TX to rollback + logTransactionRollback(redelivered, ids, null, true); + } catch (Throwable e) { + exchange.setException(e); + logTransactionRollback(redelivered, ids, e, false); + } finally { + // mark the end of this transaction boundary + exchange.getUnitOfWork().endTransactedBy(transactionKey); + } + + // if it was a local rollback only then remove its marker so outer + // transaction wont see the marker + Boolean onlyLast = (Boolean) exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST); + if (onlyLast != null && onlyLast) { + // we only want this logged at debug level + if (log.isDebugEnabled()) { + // log exception if there was a cause exception so we have the + // stack trace + Exception cause = exchange.getException(); + if (cause != null) { + log.debug("Transaction rollback (" + transactionKey + ") redelivered(" + redelivered + ") for " + + ids + " due exchange was marked for rollbackOnlyLast and caught: ", cause); + } else { + log.debug( + "Transaction rollback ({}) redelivered({}) for {} " + + "due exchange was marked for rollbackOnlyLast", + new Object[] { transactionKey, redelivered, ids }); + } + } + // remove caused exception due we was marked as rollback only last + // so by removing the exception, any outer transaction will not be + // affected + exchange.setException(null); + } + } + + public void setTransactionPolicy(JtaTransactionPolicy transactionPolicy) { + this.transactionPolicy = transactionPolicy; + } + + protected void doInTransactionTemplate(final Exchange exchange) throws Throwable { + + // spring transaction template is working best with rollback if you + // throw it a runtime exception + // otherwise it may not rollback messages send to JMS queues etc. + transactionPolicy.run(new JtaTransactionPolicy.Runnable() { + + @Override + public void run() throws Throwable { + + // wrapper exception to throw if the exchange failed + // IMPORTANT: Must be a runtime exception to let Spring regard + // it as to do "rollback" + Throwable rce; + + // and now let process the exchange by the error handler + processByErrorHandler(exchange); + + // after handling and still an exception or marked as rollback + // only then rollback + if (exchange.getException() != null || exchange.isRollbackOnly()) { + + // wrap exception in transacted exception + if (exchange.getException() != null) { + rce = exchange.getException(); + } else { + // create dummy exception to force spring transaction + // manager to rollback + rce = new TransactionRolledbackException(); + } + + // throw runtime exception to force rollback (which works + // best to rollback with Spring transaction manager) + if (log.isTraceEnabled()) { + log.trace("Throwing runtime exception to force transaction to rollback on {}", + transactionPolicy); + } + throw rce; + } + } + + }); + + } + + /** + * Processes the {@link Exchange} using the error handler. + * <p/> + * This implementation will invoke ensure this occurs synchronously, that + * means if the async routing engine did kick in, then this implementation + * will wait for the task to complete before it continues. + * + * @param exchange + * the exchange + */ + protected void processByErrorHandler(final Exchange exchange) { + + try { + output.process(exchange); + } catch (Throwable e) { + throw new RuntimeCamelException(e); + } + + } + + /** + * Logs the transaction begin + */ + private void logTransactionBegin(String redelivered, String ids) { + if (log.isDebugEnabled()) { + log.debug("Transaction begin ({}) redelivered({}) for {})", + new Object[] { transactionKey, redelivered, ids }); + } + } + + /** + * Logs the transaction commit + */ + private void logTransactionCommit(String redelivered, String ids) { + if ("true".equals(redelivered)) { + // okay its a redelivered message so log at INFO level if + // rollbackLoggingLevel is INFO or higher + // this allows people to know that the redelivered message was + // committed this time + if (rollbackLoggingLevel == LoggingLevel.INFO || rollbackLoggingLevel == LoggingLevel.WARN + || rollbackLoggingLevel == LoggingLevel.ERROR) { + log.info("Transaction commit ({}) redelivered({}) for {})", + new Object[] { transactionKey, redelivered, ids }); + // return after we have logged + return; + } + } + + // log non redelivered by default at DEBUG level + log.debug("Transaction commit ({}) redelivered({}) for {})", new Object[] { transactionKey, redelivered, ids }); + } + + /** + * Logs the transaction rollback. + */ + private void logTransactionRollback(String redelivered, String ids, Throwable e, boolean rollbackOnly) { + if (rollbackLoggingLevel == LoggingLevel.OFF) { + return; + } else if (rollbackLoggingLevel == LoggingLevel.ERROR && log.isErrorEnabled()) { + if (rollbackOnly) { + log.error("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly", + new Object[] { transactionKey, redelivered, ids }); + } else { + log.error("Transaction rollback ({}) redelivered({}) for {} caught: {}", + new Object[] { transactionKey, redelivered, ids, e.getMessage() }); + } + } else if (rollbackLoggingLevel == LoggingLevel.WARN && log.isWarnEnabled()) { + if (rollbackOnly) { + log.warn("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly", + new Object[] { transactionKey, redelivered, ids }); + } else { + log.warn("Transaction rollback ({}) redelivered({}) for {} caught: {}", + new Object[] { transactionKey, redelivered, ids, e.getMessage() }); + } + } else if (rollbackLoggingLevel == LoggingLevel.INFO && log.isInfoEnabled()) { + if (rollbackOnly) { + log.info("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly", + new Object[] { transactionKey, redelivered, ids }); + } else { + log.info("Transaction rollback ({}) redelivered({}) for {} caught: {}", + new Object[] { transactionKey, redelivered, ids, e.getMessage() }); + } + } else if (rollbackLoggingLevel == LoggingLevel.DEBUG && log.isDebugEnabled()) { + if (rollbackOnly) { + log.debug("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly", + new Object[] { transactionKey, redelivered, ids }); + } else { + log.debug("Transaction rollback ({}) redelivered({}) for {} caught: {}", + new Object[] { transactionKey, redelivered, ids, e.getMessage() }); + } + } else if (rollbackLoggingLevel == LoggingLevel.TRACE && log.isTraceEnabled()) { + if (rollbackOnly) { + log.trace("Transaction rollback ({}) redelivered({}) for {} due exchange was marked for rollbackOnly", + new Object[] { transactionKey, redelivered, ids }); + } else { + log.trace("Transaction rollback ({}) redelivered({}) for {} caught: {}", + new Object[] { transactionKey, redelivered, ids, e.getMessage() }); + } + } + } + + public void setExceptionPolicy(ExceptionPolicyStrategy exceptionPolicy) { + this.exceptionPolicy = exceptionPolicy; + } + + public ExceptionPolicyStrategy getExceptionPolicy() { + return exceptionPolicy; + } + + @Override + public Processor getOutput() { + return output; + } + + @Override + protected void doStart() throws Exception { + ServiceHelper.startServices(output); + preparingShutdown = false; + } + + @Override + protected void doStop() throws Exception { + // noop, do not stop any services which we only do when shutting down + // as the error handler can be context scoped, and should not stop in + // case + // a route stops + } + + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownServices(output); + } + + @Override + public boolean supportTransacted() { + return true; + } + + public boolean hasNext() { + return output != null; + } + + @Override + public List<Processor> next() { + if (!hasNext()) { + return null; + } + List<Processor> answer = new ArrayList<Processor>(1); + answer.add(output); + return answer; + } + + @Override + public void prepareShutdown(boolean suspendOnly, boolean forced) { + // prepare for shutdown, eg do not allow redelivery if configured + log.trace("Prepare shutdown on error handler {}", this); + preparingShutdown = true; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java new file mode 100644 index 0000000..4d17f8a --- /dev/null +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/jta/TransactionalJtaTransactionPolicy.java @@ -0,0 +1,121 @@ +package org.apache.camel.cdi.jta; + +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.SystemException; +import javax.transaction.Transaction; + +import org.apache.camel.CamelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper methods for transaction handling + */ +public abstract class TransactionalJtaTransactionPolicy extends JtaTransactionPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionalJtaTransactionPolicy.class); + + protected void runWithTransaction(final Runnable runnable, final boolean isNew) throws Throwable { + + if (isNew) { + begin(); + } + try { + runnable.run(); + } catch (RuntimeException e) { + rollback(isNew); + throw e; + } catch (Error e) { + rollback(isNew); + throw e; + } catch (Throwable e) { + rollback(isNew); + throw e; + } + if (isNew) { + commit(); + } + return; + + } + + private void begin() throws Exception { + + transactionManager.begin(); + + } + + private void commit() throws Exception { + + try { + transactionManager.commit(); + } catch (HeuristicMixedException e) { + throw new CamelException("Unable to commit transaction", e); + } catch (HeuristicRollbackException e) { + throw new CamelException("Unable to commit transaction", e); + } catch (RollbackException e) { + throw new CamelException("Unable to commit transaction", e); + } catch (SystemException e) { + throw new CamelException("Unable to commit transaction", e); + } catch (RuntimeException e) { + rollback(true); + throw e; + } catch (Exception e) { + rollback(true); + throw e; + } catch (Error e) { + rollback(true); + throw e; + } + + } + + protected void rollback(boolean isNew) throws Exception { + + try { + + if (isNew) { + transactionManager.rollback(); + } else { + transactionManager.setRollbackOnly(); + } + + } catch (Throwable e) { + + LOG.warn("Could not rollback transaction!", e); + + } + + } + + protected Transaction suspendTransaction() throws Exception { + + return transactionManager.suspend(); + + } + + protected void resumeTransaction(final Transaction suspendedTransaction) { + + if (suspendedTransaction == null) { + return; + } + + try { + transactionManager.resume(suspendedTransaction); + } catch (Throwable e) { + LOG.warn("Could not resume transaction!", e); + } + + } + + protected boolean hasActiveTransaction() throws Exception { + + return transactionManager.getStatus() != Status.STATUS_MARKED_ROLLBACK + && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION; + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8092e89f/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index df10dde..09719b1 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -105,6 +105,7 @@ <cdi-api-1.1-version>1.1</cdi-api-1.1-version> <cdi-api-1.2-version>1.2</cdi-api-1.2-version> <cdi-api-2.0-version>2.0-PFD2</cdi-api-2.0-version> + <jta-api-1.2-version>1.2</jta-api-1.2-version> <cglib-bundle-version>3.2.4_1</cglib-bundle-version> <cglib-version>3.2.4</cglib-version> <chronicle-bytes-version>1.7.35</chronicle-bytes-version>