This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2773d95a9e9 CAMEL-20521: camel-amqp - AMQP publisher application is 
losing messages with local JMS transaction enabled (#13403)
2773d95a9e9 is described below

commit 2773d95a9e99e9840018638beb333b59a91c2806
Author: Luigi De Masi <5583341+luigidem...@users.noreply.github.com>
AuthorDate: Sun Apr 14 18:47:37 2024 +0200

    CAMEL-20521: camel-amqp - AMQP publisher application is losing messages 
with local JMS transaction enabled (#13403)
---
 .../camel/component/jms/JmsConfiguration.java      |  22 +-
 .../component/jms/RestartBrokerBeforeCommitIT.java | 263 +++++++++++++++++++++
 .../infra/artemis/services/ArtemisContainer.java   |  14 +-
 .../services/RestartAwareArtemisContainer.java     |  31 +++
 4 files changed, 322 insertions(+), 8 deletions(-)

diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 7abd8cd67a7..f32d6f9025e 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -621,17 +621,23 @@ public class JmsConfiguration implements Cloneable {
                 if (message != null && callback != null) {
                     callback.sent(session, message, destination);
                 }
-                // Check commit - avoid commit call within a JTA transaction.
-                if (session.getTransacted() && 
isSessionLocallyTransacted(session)) {
-                    // Transacted session created by this template -> commit.
-                    JmsUtils.commitIfNecessary(session);
-                }
+
+                commitIfNecessary(session);
+
             } finally {
                 JmsUtils.closeMessageProducer(producer);
             }
             return null;
         }
 
+        protected void commitIfNecessary(Session session) throws JMSException {
+            // Check commit - avoid commit call within a JTA transaction.
+            if (session.getTransacted() && 
isSessionLocallyTransacted(session)) {
+                // Transacted session created by this template -> commit.
+                JmsUtils.commitIfNecessary(session);
+            }
+        }
+
         /**
          * Override so we can support preserving the Qos settings that have 
been set on the message.
          */
@@ -728,7 +734,7 @@ public class JmsConfiguration implements Cloneable {
         }
 
         ConnectionFactory factory = getOrCreateTemplateConnectionFactory();
-        JmsTemplate template = new CamelJmsTemplate(this, factory);
+        JmsTemplate template = createCamelJmsTemplate(factory);
 
         template.setPubSubDomain(pubSubDomain);
         if (destinationResolver != null) {
@@ -784,6 +790,10 @@ public class JmsConfiguration implements Cloneable {
         return template;
     }
 
+    protected CamelJmsTemplate createCamelJmsTemplate(ConnectionFactory 
connectionFactory) {
+        return new CamelJmsTemplate(this, connectionFactory);
+    }
+
     public AbstractMessageListenerContainer 
createMessageListenerContainer(JmsEndpoint endpoint) {
         AbstractMessageListenerContainer container = 
chooseMessageListenerContainerImplementation(endpoint);
         configureMessageListenerContainer(container, endpoint);
diff --git 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/RestartBrokerBeforeCommitIT.java
 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/RestartBrokerBeforeCommitIT.java
new file mode 100644
index 00000000000..4ad18c7a881
--- /dev/null
+++ 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/RestartBrokerBeforeCommitIT.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Base64;
+import java.util.UUID;
+
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.JMSException;
+import jakarta.jms.Session;
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.Registry;
+import 
org.apache.camel.test.infra.artemis.services.RestartAwareArtemisContainer;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.camel.util.json.JsonObject;
+import org.junit.jupiter.api.Test;
+import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jms.support.JmsUtils;
+import org.springframework.util.Assert;
+import org.testcontainers.containers.Container;
+import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test to prove that using 
org.springframework.jms.supportJmsUtils.commitIfNecessary, if a broker restart 
or a network
+ * issue happens after sending and before executing commit, the message will 
not be committed and the caller will not be
+ * notified, so the route continue with the happy path.
+ *
+ * https://issues.apache.org/jira/browse/CAMEL-20521
+ *
+ * https://github.com/spring-projects/spring-framework/issues/32473
+ *
+ * This issue will be fixed in spring-jms 6.1.6
+ */
+
+public class RestartBrokerBeforeCommitIT extends CamelTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RestartBrokerBeforeCommitIT.class);
+
+    protected static final String ARTEMIS_COMMAND = 
"/home/jboss/broker/bin/artemis";
+
+    protected static final String ARTEMIS_ADDRESS_NAME = "artemis-demo-topic";
+
+    protected static final String ARTEMIS_QUEUE_NAME = 
"sub1-artemis-demo-topic";
+
+    protected final String SERVICE_ADDRESS = 
"jms:topic:artemis-demo-topic?jmsMessageType=Text";
+
+    protected RestartAwareArtemisContainer broker;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        broker = new RestartAwareArtemisContainer();
+        broker.start();
+        assertThat(broker.isRunning()).isTrue();
+    }
+
+    @Test
+    public void commitShouldThrowJmsException() throws Exception {
+
+        int okMessageExpectedCount = 0;
+        int jmsExceptionCount = 1;
+        int exceptionMessageExpectedCount = 0;
+
+        createQueue();
+
+        MockEndpoint okMock;
+        try {
+            String uuid = UUID.randomUUID().toString();
+            template.send("seda:processMessage", ex -> {
+                ex.getMessage().setHeader("TMPL_uuid", uuid);
+                ex.getMessage().setBody("Hi!");
+            });
+            MockEndpoint jmsException = getMockEndpoint("mock:jmsException");
+            jmsException.setExpectedMessageCount(jmsExceptionCount);
+            okMock = getMockEndpoint("mock:ok");
+            okMock.setExpectedMessageCount(okMessageExpectedCount);
+            MockEndpoint genericExceptionMock = 
getMockEndpoint("mock:exception");
+            
genericExceptionMock.setExpectedMessageCount(exceptionMessageExpectedCount);
+            LOG.info("Asserting all mock satisfied");
+            MockEndpoint.assertIsSatisfied(context, 10, SECONDS);
+            LOG.info("All mock satisfied");
+            int actualMessageCountOnQueue = 
getMessageAdded(broker.adminPort());
+            assertThat(actualMessageCountOnQueue).isEqualTo(0);
+        } finally {
+            broker.stop();
+        }
+    }
+
+    private void createQueue() throws IOException, InterruptedException {
+        Container.ExecResult createQueueResult = 
broker.execInContainer(ARTEMIS_COMMAND,
+                "queue", "create",
+                "--user=" + broker.username(),
+                "--password=" + broker.password(),
+                "--address=" + ARTEMIS_ADDRESS_NAME,
+                "--durable",
+                "--multicast",
+                "--preserve-on-no-consumers",
+                "--name=" + ARTEMIS_QUEUE_NAME,
+                "--auto-create-address");
+
+        assertThat(createQueueResult.getExitCode()).isZero();
+        String err = createQueueResult.getStderr();
+        String out = createQueueResult.getStdout();
+        LOG.info("STD OUT: {}", out);
+        LOG.info("STD ERR: {}", err);
+        assertThat(createQueueResult.getExitCode()).isZero();
+    }
+
+    protected int getMessageAdded(int webConsolePort) throws 
URISyntaxException, IOException, InterruptedException {
+
+        HttpRequest request = HttpRequest.newBuilder()
+                .uri(new URI(
+                        broker.adminURL() + 
"/console/jolokia/read/org.apache.activemq.artemis:" +
+                             
"broker=!%22broker!%22,component=addresses,address=!%22artemis-demo-topic!%22,subcomponent=queues,"
+                             +
+                             
"routing-type=!%22multicast!%22,queue=!%22sub1-artemis-demo-topic!%22/MessagesAdded"))
+                .GET()
+                .header("Authorization",
+                        "Basic " + 
Base64.getEncoder().encodeToString((broker.username() + ":" + 
broker.password()).getBytes()))
+                .build();
+        HttpResponse<String> response = HttpClient.newBuilder()
+                .build()
+                .send(request, HttpResponse.BodyHandlers.ofString());
+
+        assertThat(response.statusCode()).isLessThan(300);
+
+        LOG.info("Jolokia Response: ErrorCode: {} Body: {}", 
response.statusCode(), response.body());
+
+        ObjectMapper mapper = new ObjectMapper();
+        JsonObject jsonObject = mapper.readValue(response.body(), 
JsonObject.class);
+        Integer value = jsonObject.getInteger("value");
+        assertThat(value).isNotNull();
+        return value;
+    }
+
+    @Override
+    protected void bindToRegistry(Registry registry) {
+        //Coonection Factory
+        ActiveMQConnectionFactory factory
+                = new ActiveMQConnectionFactory(
+                        "tcp://" + broker.getHost() + ":" + 
broker.defaultAcceptorPort(), broker.username(), broker.password());
+        registry.bind("factory", factory);
+
+        // Connection Pool
+        JmsPoolConnectionFactory jmsPoolConnectionFactory = new 
JmsPoolConnectionFactory();
+        jmsPoolConnectionFactory.setConnectionFactory(factory);
+        jmsPoolConnectionFactory.setMaxConnections(20);
+        jmsPoolConnectionFactory.setMaxSessionsPerConnection(1);
+        jmsPoolConnectionFactory.setUseAnonymousProducers(false);
+        jmsPoolConnectionFactory.setConnectionCheckInterval(-1);
+        jmsPoolConnectionFactory.setConnectionIdleTimeout(30000);
+        registry.bind("connectionPool", jmsPoolConnectionFactory);
+
+        // JMS Component and configuration that use JmsUtil.commitIfNecessary
+        final JmsConfiguration jmsConfiguration = 
getCustomJmsConfiguration(jmsPoolConnectionFactory);
+        registry.bind("jmsConfiguration", jmsConfiguration);
+
+        //jms component
+        JmsComponent jmsComponent = new JmsComponent();
+        jmsComponent.setConfiguration(jmsConfiguration);
+        registry.bind("jms", jmsComponent);
+    }
+
+    private CustomJmsConfiguration 
getCustomJmsConfiguration(JmsPoolConnectionFactory jmsPoolConnectionFactory) {
+        CustomJmsConfiguration jmsConfiguration = new CustomJmsConfiguration();
+        jmsConfiguration.setConnectionFactory(jmsPoolConnectionFactory);
+        jmsConfiguration.setTransacted(true);
+        jmsConfiguration.setLazyCreateTransactionManager(true);
+        jmsConfiguration.setDeliveryPersistent(true);
+        jmsConfiguration.setRequestTimeout(10000);
+        jmsConfiguration.setReceiveTimeout(1000);
+        jmsConfiguration.setCacheLevelName("CACHE_NONE");
+        jmsConfiguration.setAcknowledgementModeName("SESSION_TRANSACTED");
+        return jmsConfiguration;
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("seda:processMessage")
+                        .to(SERVICE_ADDRESS)
+                        .to("mock:ok")
+                        .log(LoggingLevel.INFO, "MESSAGE SENT: 
${header.TMPL_uuid}")
+
+                        .onException(JMSException.class)
+                        .id("onJMSException")
+                        .handled(true)
+                        .maximumRedeliveries(0)
+                        .logStackTrace(false)
+                        .log(LoggingLevel.ERROR,
+                                "MESSAGE Failed: ${header.TMPL_uuid} \n***** 
JMSException received *****: ${exception.message}")
+                        .to("mock:jmsException")
+                        .end()
+                        .onException(Throwable.class)
+                        .id("onException")
+                        .handled(true)
+                        .maximumRedeliveries(0)
+                        .to("mock:exception")
+                        .end();
+            }
+        };
+    }
+
+    // CUSTOM JMS CONFIGURATION
+    class CustomJmsConfiguration extends JmsConfiguration {
+        @Override
+        protected CamelJmsTemplate createCamelJmsTemplate(ConnectionFactory 
connectionFactory) {
+            return new CustomCamelJmsTemplate(this, connectionFactory);
+        }
+    }
+
+    // CUSTOM JMS TEMPLATE
+    class CustomCamelJmsTemplate extends JmsConfiguration.CamelJmsTemplate {
+
+        public CustomCamelJmsTemplate(JmsConfiguration config, 
ConnectionFactory connectionFactory) {
+            super(config, connectionFactory);
+        }
+
+        @Override
+        protected void commitIfNecessary(Session session) throws JMSException {
+            // Transacted session created by this template -> commit.
+            Assert.notNull(session, "Session must not be null");
+
+            // Check commit - avoid commit call within a JTA transaction.
+            if (session.getTransacted() && 
isSessionLocallyTransacted(session)) {
+                broker.restart();
+                // Transacted session created by this template -> commit.
+                JmsUtils.commitIfNecessary(session);
+            }
+        }
+    }
+}
diff --git 
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisContainer.java
 
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisContainer.java
index 8cf6a82cec0..599746800b7 100644
--- 
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisContainer.java
+++ 
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisContainer.java
@@ -28,14 +28,16 @@ public class ArtemisContainer extends 
GenericContainer<ArtemisContainer> impleme
     private static final int DEFAULT_AMQP_PORT = 5672;
     private static final int DEFAULT_ADMIN_PORT = 8161;
     private static final int DEFAULT_ACCEPTOR_PORT = 61616;
+    private static final String DEFAULT_USERNAME = "admin";
+    private static final String DEFAULT_PASSWORD = "admin";
 
     public ArtemisContainer() {
         super(DockerImageName
                 
.parse(LocalPropertyResolver.getProperty(ArtemisContainer.class, 
ArtemisProperties.ARTEMIS_CONTAINER)));
 
         this.withEnv("AMQ_EXTRA_ARGS", "--relax-jolokia")
-                .withEnv("AMQ_USER", "admin")
-                .withEnv("AMQ_PASSWORD", "admin")
+                .withEnv("AMQ_USER", DEFAULT_USERNAME)
+                .withEnv("AMQ_PASSWORD", DEFAULT_PASSWORD)
                 .withExposedPorts(DEFAULT_MQTT_PORT, DEFAULT_AMQP_PORT,
                         DEFAULT_ADMIN_PORT, DEFAULT_ACCEPTOR_PORT)
                 .waitingFor(Wait.forListeningPort());
@@ -130,4 +132,12 @@ public class ArtemisContainer extends 
GenericContainer<ArtemisContainer> impleme
     public String getOpenwireEndpoint() {
         return String.format("tcp://%s:%d", getHost(), openwirePort());
     }
+
+    public String username() {
+        return DEFAULT_USERNAME;
+    }
+
+    public String password() {
+        return DEFAULT_PASSWORD;
+    }
 }
diff --git 
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/RestartAwareArtemisContainer.java
 
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/RestartAwareArtemisContainer.java
new file mode 100644
index 00000000000..2cfff9a661a
--- /dev/null
+++ 
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/RestartAwareArtemisContainer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.infra.artemis.services;
+
+public class RestartAwareArtemisContainer extends ArtemisContainer {
+
+    public void restart() {
+        String tag = this.getContainerId();
+        dockerClient.commitCmd(this.getContainerId())
+                .withRepository("tempimg")
+                .withTag(tag).exec();
+        this.stop();
+        this.setDockerImageName("tempimg:" + tag);
+        this.start();
+    }
+
+}

Reply via email to