This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 2773d95a9e9 CAMEL-20521: camel-amqp - AMQP publisher application is losing messages with local JMS transaction enabled (#13403) 2773d95a9e9 is described below commit 2773d95a9e99e9840018638beb333b59a91c2806 Author: Luigi De Masi <5583341+luigidem...@users.noreply.github.com> AuthorDate: Sun Apr 14 18:47:37 2024 +0200 CAMEL-20521: camel-amqp - AMQP publisher application is losing messages with local JMS transaction enabled (#13403) --- .../camel/component/jms/JmsConfiguration.java | 22 +- .../component/jms/RestartBrokerBeforeCommitIT.java | 263 +++++++++++++++++++++ .../infra/artemis/services/ArtemisContainer.java | 14 +- .../services/RestartAwareArtemisContainer.java | 31 +++ 4 files changed, 322 insertions(+), 8 deletions(-) diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index 7abd8cd67a7..f32d6f9025e 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -621,17 +621,23 @@ public class JmsConfiguration implements Cloneable { if (message != null && callback != null) { callback.sent(session, message, destination); } - // Check commit - avoid commit call within a JTA transaction. - if (session.getTransacted() && isSessionLocallyTransacted(session)) { - // Transacted session created by this template -> commit. - JmsUtils.commitIfNecessary(session); - } + + commitIfNecessary(session); + } finally { JmsUtils.closeMessageProducer(producer); } return null; } + protected void commitIfNecessary(Session session) throws JMSException { + // Check commit - avoid commit call within a JTA transaction. + if (session.getTransacted() && isSessionLocallyTransacted(session)) { + // Transacted session created by this template -> commit. + JmsUtils.commitIfNecessary(session); + } + } + /** * Override so we can support preserving the Qos settings that have been set on the message. */ @@ -728,7 +734,7 @@ public class JmsConfiguration implements Cloneable { } ConnectionFactory factory = getOrCreateTemplateConnectionFactory(); - JmsTemplate template = new CamelJmsTemplate(this, factory); + JmsTemplate template = createCamelJmsTemplate(factory); template.setPubSubDomain(pubSubDomain); if (destinationResolver != null) { @@ -784,6 +790,10 @@ public class JmsConfiguration implements Cloneable { return template; } + protected CamelJmsTemplate createCamelJmsTemplate(ConnectionFactory connectionFactory) { + return new CamelJmsTemplate(this, connectionFactory); + } + public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) { AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation(endpoint); configureMessageListenerContainer(container, endpoint); diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/RestartBrokerBeforeCommitIT.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/RestartBrokerBeforeCommitIT.java new file mode 100644 index 00000000000..4ad18c7a881 --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/RestartBrokerBeforeCommitIT.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Base64; +import java.util.UUID; + +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSException; +import jakarta.jms.Session; + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.camel.LoggingLevel; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spi.Registry; +import org.apache.camel.test.infra.artemis.services.RestartAwareArtemisContainer; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.camel.util.json.JsonObject; +import org.junit.jupiter.api.Test; +import org.messaginghub.pooled.jms.JmsPoolConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jms.support.JmsUtils; +import org.springframework.util.Assert; +import org.testcontainers.containers.Container; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test to prove that using org.springframework.jms.supportJmsUtils.commitIfNecessary, if a broker restart or a network + * issue happens after sending and before executing commit, the message will not be committed and the caller will not be + * notified, so the route continue with the happy path. + * + * https://issues.apache.org/jira/browse/CAMEL-20521 + * + * https://github.com/spring-projects/spring-framework/issues/32473 + * + * This issue will be fixed in spring-jms 6.1.6 + */ + +public class RestartBrokerBeforeCommitIT extends CamelTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(RestartBrokerBeforeCommitIT.class); + + protected static final String ARTEMIS_COMMAND = "/home/jboss/broker/bin/artemis"; + + protected static final String ARTEMIS_ADDRESS_NAME = "artemis-demo-topic"; + + protected static final String ARTEMIS_QUEUE_NAME = "sub1-artemis-demo-topic"; + + protected final String SERVICE_ADDRESS = "jms:topic:artemis-demo-topic?jmsMessageType=Text"; + + protected RestartAwareArtemisContainer broker; + + @Override + protected void doPreSetup() throws Exception { + broker = new RestartAwareArtemisContainer(); + broker.start(); + assertThat(broker.isRunning()).isTrue(); + } + + @Test + public void commitShouldThrowJmsException() throws Exception { + + int okMessageExpectedCount = 0; + int jmsExceptionCount = 1; + int exceptionMessageExpectedCount = 0; + + createQueue(); + + MockEndpoint okMock; + try { + String uuid = UUID.randomUUID().toString(); + template.send("seda:processMessage", ex -> { + ex.getMessage().setHeader("TMPL_uuid", uuid); + ex.getMessage().setBody("Hi!"); + }); + MockEndpoint jmsException = getMockEndpoint("mock:jmsException"); + jmsException.setExpectedMessageCount(jmsExceptionCount); + okMock = getMockEndpoint("mock:ok"); + okMock.setExpectedMessageCount(okMessageExpectedCount); + MockEndpoint genericExceptionMock = getMockEndpoint("mock:exception"); + genericExceptionMock.setExpectedMessageCount(exceptionMessageExpectedCount); + LOG.info("Asserting all mock satisfied"); + MockEndpoint.assertIsSatisfied(context, 10, SECONDS); + LOG.info("All mock satisfied"); + int actualMessageCountOnQueue = getMessageAdded(broker.adminPort()); + assertThat(actualMessageCountOnQueue).isEqualTo(0); + } finally { + broker.stop(); + } + } + + private void createQueue() throws IOException, InterruptedException { + Container.ExecResult createQueueResult = broker.execInContainer(ARTEMIS_COMMAND, + "queue", "create", + "--user=" + broker.username(), + "--password=" + broker.password(), + "--address=" + ARTEMIS_ADDRESS_NAME, + "--durable", + "--multicast", + "--preserve-on-no-consumers", + "--name=" + ARTEMIS_QUEUE_NAME, + "--auto-create-address"); + + assertThat(createQueueResult.getExitCode()).isZero(); + String err = createQueueResult.getStderr(); + String out = createQueueResult.getStdout(); + LOG.info("STD OUT: {}", out); + LOG.info("STD ERR: {}", err); + assertThat(createQueueResult.getExitCode()).isZero(); + } + + protected int getMessageAdded(int webConsolePort) throws URISyntaxException, IOException, InterruptedException { + + HttpRequest request = HttpRequest.newBuilder() + .uri(new URI( + broker.adminURL() + "/console/jolokia/read/org.apache.activemq.artemis:" + + "broker=!%22broker!%22,component=addresses,address=!%22artemis-demo-topic!%22,subcomponent=queues," + + + "routing-type=!%22multicast!%22,queue=!%22sub1-artemis-demo-topic!%22/MessagesAdded")) + .GET() + .header("Authorization", + "Basic " + Base64.getEncoder().encodeToString((broker.username() + ":" + broker.password()).getBytes())) + .build(); + HttpResponse<String> response = HttpClient.newBuilder() + .build() + .send(request, HttpResponse.BodyHandlers.ofString()); + + assertThat(response.statusCode()).isLessThan(300); + + LOG.info("Jolokia Response: ErrorCode: {} Body: {}", response.statusCode(), response.body()); + + ObjectMapper mapper = new ObjectMapper(); + JsonObject jsonObject = mapper.readValue(response.body(), JsonObject.class); + Integer value = jsonObject.getInteger("value"); + assertThat(value).isNotNull(); + return value; + } + + @Override + protected void bindToRegistry(Registry registry) { + //Coonection Factory + ActiveMQConnectionFactory factory + = new ActiveMQConnectionFactory( + "tcp://" + broker.getHost() + ":" + broker.defaultAcceptorPort(), broker.username(), broker.password()); + registry.bind("factory", factory); + + // Connection Pool + JmsPoolConnectionFactory jmsPoolConnectionFactory = new JmsPoolConnectionFactory(); + jmsPoolConnectionFactory.setConnectionFactory(factory); + jmsPoolConnectionFactory.setMaxConnections(20); + jmsPoolConnectionFactory.setMaxSessionsPerConnection(1); + jmsPoolConnectionFactory.setUseAnonymousProducers(false); + jmsPoolConnectionFactory.setConnectionCheckInterval(-1); + jmsPoolConnectionFactory.setConnectionIdleTimeout(30000); + registry.bind("connectionPool", jmsPoolConnectionFactory); + + // JMS Component and configuration that use JmsUtil.commitIfNecessary + final JmsConfiguration jmsConfiguration = getCustomJmsConfiguration(jmsPoolConnectionFactory); + registry.bind("jmsConfiguration", jmsConfiguration); + + //jms component + JmsComponent jmsComponent = new JmsComponent(); + jmsComponent.setConfiguration(jmsConfiguration); + registry.bind("jms", jmsComponent); + } + + private CustomJmsConfiguration getCustomJmsConfiguration(JmsPoolConnectionFactory jmsPoolConnectionFactory) { + CustomJmsConfiguration jmsConfiguration = new CustomJmsConfiguration(); + jmsConfiguration.setConnectionFactory(jmsPoolConnectionFactory); + jmsConfiguration.setTransacted(true); + jmsConfiguration.setLazyCreateTransactionManager(true); + jmsConfiguration.setDeliveryPersistent(true); + jmsConfiguration.setRequestTimeout(10000); + jmsConfiguration.setReceiveTimeout(1000); + jmsConfiguration.setCacheLevelName("CACHE_NONE"); + jmsConfiguration.setAcknowledgementModeName("SESSION_TRANSACTED"); + return jmsConfiguration; + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("seda:processMessage") + .to(SERVICE_ADDRESS) + .to("mock:ok") + .log(LoggingLevel.INFO, "MESSAGE SENT: ${header.TMPL_uuid}") + + .onException(JMSException.class) + .id("onJMSException") + .handled(true) + .maximumRedeliveries(0) + .logStackTrace(false) + .log(LoggingLevel.ERROR, + "MESSAGE Failed: ${header.TMPL_uuid} \n***** JMSException received *****: ${exception.message}") + .to("mock:jmsException") + .end() + .onException(Throwable.class) + .id("onException") + .handled(true) + .maximumRedeliveries(0) + .to("mock:exception") + .end(); + } + }; + } + + // CUSTOM JMS CONFIGURATION + class CustomJmsConfiguration extends JmsConfiguration { + @Override + protected CamelJmsTemplate createCamelJmsTemplate(ConnectionFactory connectionFactory) { + return new CustomCamelJmsTemplate(this, connectionFactory); + } + } + + // CUSTOM JMS TEMPLATE + class CustomCamelJmsTemplate extends JmsConfiguration.CamelJmsTemplate { + + public CustomCamelJmsTemplate(JmsConfiguration config, ConnectionFactory connectionFactory) { + super(config, connectionFactory); + } + + @Override + protected void commitIfNecessary(Session session) throws JMSException { + // Transacted session created by this template -> commit. + Assert.notNull(session, "Session must not be null"); + + // Check commit - avoid commit call within a JTA transaction. + if (session.getTransacted() && isSessionLocallyTransacted(session)) { + broker.restart(); + // Transacted session created by this template -> commit. + JmsUtils.commitIfNecessary(session); + } + } + } +} diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisContainer.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisContainer.java index 8cf6a82cec0..599746800b7 100644 --- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisContainer.java +++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisContainer.java @@ -28,14 +28,16 @@ public class ArtemisContainer extends GenericContainer<ArtemisContainer> impleme private static final int DEFAULT_AMQP_PORT = 5672; private static final int DEFAULT_ADMIN_PORT = 8161; private static final int DEFAULT_ACCEPTOR_PORT = 61616; + private static final String DEFAULT_USERNAME = "admin"; + private static final String DEFAULT_PASSWORD = "admin"; public ArtemisContainer() { super(DockerImageName .parse(LocalPropertyResolver.getProperty(ArtemisContainer.class, ArtemisProperties.ARTEMIS_CONTAINER))); this.withEnv("AMQ_EXTRA_ARGS", "--relax-jolokia") - .withEnv("AMQ_USER", "admin") - .withEnv("AMQ_PASSWORD", "admin") + .withEnv("AMQ_USER", DEFAULT_USERNAME) + .withEnv("AMQ_PASSWORD", DEFAULT_PASSWORD) .withExposedPorts(DEFAULT_MQTT_PORT, DEFAULT_AMQP_PORT, DEFAULT_ADMIN_PORT, DEFAULT_ACCEPTOR_PORT) .waitingFor(Wait.forListeningPort()); @@ -130,4 +132,12 @@ public class ArtemisContainer extends GenericContainer<ArtemisContainer> impleme public String getOpenwireEndpoint() { return String.format("tcp://%s:%d", getHost(), openwirePort()); } + + public String username() { + return DEFAULT_USERNAME; + } + + public String password() { + return DEFAULT_PASSWORD; + } } diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/RestartAwareArtemisContainer.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/RestartAwareArtemisContainer.java new file mode 100644 index 00000000000..2cfff9a661a --- /dev/null +++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/RestartAwareArtemisContainer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.infra.artemis.services; + +public class RestartAwareArtemisContainer extends ArtemisContainer { + + public void restart() { + String tag = this.getContainerId(); + dockerClient.commitCmd(this.getContainerId()) + .withRepository("tempimg") + .withTag(tag).exec(); + this.stop(); + this.setDockerImageName("tempimg:" + tag); + this.start(); + } + +}