alexkazan87 commented on issue #3642:
URL: https://github.com/apache/camel-quarkus/issues/3642#issuecomment-1074110332


   Yes, you are right, `_Kafka transaction does not support JTA_`. 
   
   The main problem is that I enabled the kafka transactions with 
transaction.id property for exactly once and the camel code before it sent any 
event, It did not initialize it.  This is not related to transacted(). if the 
camel team can solve this then we can move to other trouble for me.  
   
   `_I assume that you want to do Kafka transaction within transacted() in 
camel?_` 
   Yes, exactly!
   
    My main goal, in the end, is to synchronise the JTA(Narayana) with the 
Kafka transaction manager having only one compact transaction lifecycle through 
transacted(). This is something that I can manage later by injecting an 
alternative TransactionManager by using the KafkaTransactionManager. This is a 
unique feature not only in camel but also in quarkus  - kafka generally.
   
   Code:
   
        @ApplicationScoped
        public class MyRoute extends RouteBuilder {
        private static final Queue<String> messageQueue = new 
LinkedBlockingQueue();
   
       @Inject
       RequiresNewJtaTransactionPolicy requiresNewJtaTransactionPolicy;
   
       @ConsumeEvent(Constants.MONEY_TRANSFER_EVENT)
       public void pushMoneyEvent(String order) {
           messageQueue .add(order);
       }
   
       @Override
       public void configure() {
   
           onException(CamelException.class)
                   .backOffMultiplier(2)
                   .maximumRedeliveries(5)
                   .retryAttemptedLogLevel(LoggingLevel.INFO)
                   .handled(true)
                   .log("After 5 redeliveries send elk ${body}")
                   .process(processor -> {
                               //elk....
                           }
                   )
                   .end();
   
           from("timer:foo?period={{timer.period}}&delay={{timer.delay}}")
                   .routeId("FromTimer2Kafka")
                   .process(exchange -> {
                       if (!messageQueue .isEmpty()) {
                           String pp = messageQueue .poll();
                           exchange.getIn().setHeader("NEW_EVENT", "CREATED");
                           exchange.getIn().setBody(pp);
                       }
                   })
                   .transacted()
                   .policy(requiresNewJtaTransactionPolicy)
                   .filter(header("NEW_EVENT").isEqualTo("CREATED"))
                   .log("Before process! Event : \"${body}\" with headers  
\"${headers}\" ")
                   .doTry()
                       .to("kafka:{{kafka.topic.name}}")
                       .log("Message sent to the topic! : \"${body}\" ")
                   .doCatch(Exception.class)
                       .log("Exception : ${exception.message}")
                       .process(exchange -> {
                           Exception cause = 
exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
                           log.error("Exception:{}", cause.getMessage());
                           throw new CamelException(cause.getMessage());
                       })
                   .end();
       }
   }
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to