gmagnotta opened a new issue #2815:
URL: https://github.com/apache/camel-quarkus/issues/2815


   Hi,
   
   I'm trying to use an XA transaction with both 
org.postgresql.xa.PGXADataSource and ActiveMQ component in camel-quarkus but I 
have have the following exception:
   
   ```
   Exception occurred during route processing: 
org.springframework.jms.UncategorizedJmsException: Uncategorized exception 
occurred during JMS processing; nested exception is javax.jms.JMSException: 
Session's XAResource has not been enlisted in a distributed transaction.
          at 
org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311)
          at 
org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185)
          at 
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507)
          at 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:552)
          at 
org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:445)
          at 
org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:398)
          at 
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:158)
          at 
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:169)
          at 
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
          at 
org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
          at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:286)
          at 
org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:90)
          at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
          at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleSync(DefaultReactiveExecutor.java:65)
          at org.apache.camel.processor.Pipeline.process(Pipeline.java:145)
          at 
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
          at 
org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41)
          at 
org.apache.camel.jta.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:233)
          at 
org.apache.camel.jta.TransactionErrorHandler$1.run(TransactionErrorHandler.java:196)
          at 
org.apache.camel.quarkus.component.jta.TransactionalJtaTransactionPolicy.runWithTransaction(TransactionalJtaTransactionPolicy.java:48)
          at 
org.apache.camel.quarkus.component.jta.RequiredJtaTransactionPolicy.run(RequiredJtaTransactionPolicy.java:26)
          at 
org.apache.camel.jta.TransactionErrorHandler.doInTransactionTemplate(TransactionErrorHandler.java:186)
          at 
org.apache.camel.jta.TransactionErrorHandler.processInTransaction(TransactionErrorHandler.java:137)
          at 
org.apache.camel.jta.TransactionErrorHandler.process(TransactionErrorHandler.java:101)
          at 
org.apache.camel.jta.TransactionErrorHandler.process(TransactionErrorHandler.java:110)
          at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.doRun(RedeliveryErrorHandler.java:714)
          at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:623)
          at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
          at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:55)
          at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.lambda$doRun$1(RedeliveryErrorHandler.java:717)
          at 
org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:87)
          at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
   Caused by: javax.jms.JMSException: Session's XAResource has not been 
enlisted in a distributed transaction.
          at 
org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:101)
          at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1919)
          at 
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:288)
          at 
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:223)
          at 
org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:241)
          at 
org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:634)
          at 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:657)
          at 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:597)
          at 
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.lambda$send$0(JmsConfiguration.java:554)
          at 
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:504)
   ```
   
   The Route is the following:
   
   ```
   package org.gmagnotta.app.route;
   
   import javax.enterprise.context.ApplicationScoped;
   import javax.inject.Inject;
   import javax.transaction.TransactionManager;
   import javax.transaction.UserTransaction;
   
   import org.apache.activemq.ActiveMQXAConnectionFactory;
   import org.apache.camel.Component;
   import org.apache.camel.builder.RouteBuilder;
   import org.apache.camel.component.activemq.ActiveMQComponent;
   import org.apache.camel.converter.jaxb.JaxbDataFormat;
   import org.eclipse.microprofile.config.inject.ConfigProperty;
   import org.gmagnotta.model.Order;
   import org.jboss.logging.Logger;
   import org.springframework.transaction.jta.JtaTransactionManager;
   
   @ApplicationScoped
   public class XmlFileRoute extends RouteBuilder {
   
       private static final Logger LOGGER = 
Logger.getLogger(XmlFileRoute.class);
   
       @ConfigProperty(name = "orders.dir")
       String ordersDirectory;
       
       @ConfigProperty(name = "broker.url")
       String brokerUrl;
       
       @Inject
       TransactionManager transactionManager;
       
       @Inject
       UserTransaction userTransaction;
       
       @Override
       public void configure() throws Exception {
        
        JtaTransactionManager jtaTransactionManager = new 
JtaTransactionManager(userTransaction, transactionManager);
        getContext().getRegistry().bind("jtaTransactionManager", 
jtaTransactionManager);
   
        ActiveMQXAConnectionFactory activeMQConectionFactory = new 
ActiveMQXAConnectionFactory(brokerUrl);
                
        ActiveMQComponent activeMq = new ActiveMQComponent();
        activeMq.setConnectionFactory(activeMQConectionFactory);
        activeMq.setTransacted(true);
        activeMq.setTransactionManager(jtaTransactionManager);
        
        getContext().addComponent("activemq", activeMq);
        
        
           JaxbDataFormat jaxbDataFormat = new 
JaxbDataFormat("org.gmagnotta.jaxb");
           jaxbDataFormat.setSchema("classpath:order.xsd");
           
           onException(Exception.class)
            .log("Exception occurred during route processing: 
${exception.stacktrace}")
            .markRollbackOnly();
   
           from("file:" + ordersDirectory + 
"?moveFailed=.error&readLock=markerFile&readLockTimeout=100&readLockCheckInterval=20&maxMessagesPerPoll=3")
            .routeId("fileRoute")
            .log("Consuming file ${headers.CamelFileAbsolutePath}")
            .to("direct:orderprocessing");
           
           from("direct:orderprocessing")
            .routeId("orderProcessing")
            .threads(3, 3, "fileProcessor")
            .unmarshal(jaxbDataFormat)
             .transacted()
              .to("bean://orderprocessor")
              .setBody(simple("${headers.OrderEntityId}"))
               .setBody(constant("Test"))
              .to("activemq:queue:ordercreated")
              .throwException(new Exception("test xa rollback"))
             .end()
            .log("done processing order ${headers.OrderEntityId}");
   
       }
   }
   ```
   
   Inside the bean orderprocessor I make a persist in the database, and then 
try to send on JMS the id.
   
   When the transacted block complete with success or with an exception (as in 
this example) I have always the same exception.
   
   Any suggestion? I see that there is a similar problem with 
https://github.com/quarkusio/quarkus/issues/5762
   
   Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to