This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new e8f10dd24a Ensure AMQP protocol marshals messages before passing to 
broker (#1859)
e8f10dd24a is described below

commit e8f10dd24a26a04892fe9d7b10fd697a2e138d19
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Mar 30 12:25:56 2026 -0400

    Ensure AMQP protocol marshals messages before passing to broker (#1859)
    
    This change brings the behaior inline with other protocols and will
    prevent a future race condition during copy/dispatch as the data will
    already be marshaled.
    
    This closes #1858
---
 .../transport/amqp/protocol/AmqpReceiver.java      |  5 ++++
 .../activemq/transport/amqp/JMSClientTest.java     | 27 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 937c3a19cd..c1d218aebd 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -205,6 +205,11 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
             }
 
             message.onSend();
+            // GH-1851 - Ensure we marshal the message before passing to the 
broker
+            // This prevents a race condition later if the message is 
copyied/marshaled
+            // at this same time. This behavior is in line with how messages 
are received
+            // using OpenWire (when not using VM) and Stomp.
+            message.beforeMarshall(null);
 
             sendsInFlight++;
 
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 5cc886d926..4b6e4aa63e 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -52,6 +53,10 @@ import jakarta.jms.TopicConnection;
 import jakarta.jms.TopicSession;
 import jakarta.jms.TopicSubscriber;
 
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
@@ -1360,4 +1365,26 @@ public class JMSClientTest extends JMSClientTestSupport {
 
         assertTrue("Did not receive all messages: " + MSG_COUNT, 
done.await(15, TimeUnit.SECONDS));
     }
+
+    @Override
+    protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws 
Exception {
+        super.addAdditionalPlugins(plugins);
+        plugins.add(new BrokerPlugin() {
+
+            @Override
+            public Broker installPlugin(Broker broker) {
+                return new BrokerFilter(broker) {
+
+                    @Override
+                    public void send(ProducerBrokerExchange producerExchange,
+                            org.apache.activemq.command.Message messageSend) 
throws Exception {
+                        super.send(producerExchange, messageSend);
+
+                        // The message should always be passed to the broker 
in a marshaled state
+                        assertTrue(messageSend.isMarshalled());
+                    }
+                };
+            }
+        });
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to