This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new e8f10dd24a Ensure AMQP protocol marshals messages before passing to
broker (#1859)
e8f10dd24a is described below
commit e8f10dd24a26a04892fe9d7b10fd697a2e138d19
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Mar 30 12:25:56 2026 -0400
Ensure AMQP protocol marshals messages before passing to broker (#1859)
This change brings the behaior inline with other protocols and will
prevent a future race condition during copy/dispatch as the data will
already be marshaled.
This closes #1858
---
.../transport/amqp/protocol/AmqpReceiver.java | 5 ++++
.../activemq/transport/amqp/JMSClientTest.java | 27 ++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 937c3a19cd..c1d218aebd 100644
---
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -205,6 +205,11 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
}
message.onSend();
+ // GH-1851 - Ensure we marshal the message before passing to the
broker
+ // This prevents a race condition later if the message is
copyied/marshaled
+ // at this same time. This behavior is in line with how messages
are received
+ // using OpenWire (when not using VM) and Stomp.
+ message.beforeMarshall(null);
sendsInFlight++;
diff --git
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 5cc886d926..4b6e4aa63e 100644
---
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -52,6 +53,10 @@ import jakarta.jms.TopicConnection;
import jakarta.jms.TopicSession;
import jakarta.jms.TopicSubscriber;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
@@ -1360,4 +1365,26 @@ public class JMSClientTest extends JMSClientTestSupport {
assertTrue("Did not receive all messages: " + MSG_COUNT,
done.await(15, TimeUnit.SECONDS));
}
+
+ @Override
+ protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws
Exception {
+ super.addAdditionalPlugins(plugins);
+ plugins.add(new BrokerPlugin() {
+
+ @Override
+ public Broker installPlugin(Broker broker) {
+ return new BrokerFilter(broker) {
+
+ @Override
+ public void send(ProducerBrokerExchange producerExchange,
+ org.apache.activemq.command.Message messageSend)
throws Exception {
+ super.send(producerExchange, messageSend);
+
+ // The message should always be passed to the broker
in a marshaled state
+ assertTrue(messageSend.isMarshalled());
+ }
+ };
+ }
+ });
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact