This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 31c84c4c39 ARTEMIS-5798 MQTT QoS2 unauthorized pub ignored on retry
31c84c4c39 is described below

commit 31c84c4c39096f26ac96a91f776e3a88ca166aa9
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Mar 4 13:49:16 2026 -0600

    ARTEMIS-5798 MQTT QoS2 unauthorized pub ignored on retry
---
 .../core/protocol/mqtt/MQTTPublishManager.java     |  7 ++---
 .../integration/mqtt/PahoMQTTQOS2SecurityTest.java | 30 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 4f5fec62a5..5f8d8453f8 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -208,9 +208,6 @@ public class MQTTPublishManager {
          int packetId = message.variableHeader().packetId();
          boolean qos2PublishAlreadyReceived = 
state.getPubRec().contains(packetId);
          if (qos < 2 || !qos2PublishAlreadyReceived) {
-            if (qos == 2 && !internal)
-               state.getPubRec().add(packetId);
-
             Transaction tx = session.getServerSession().newTransaction();
             try {
                AddressInfo addressInfo = 
session.getServer().getAddressInfo(address);
@@ -223,6 +220,10 @@ public class MQTTPublishManager {
                }
                session.getServerSession().send(tx, serverMessage, true, 
senderName, false);
 
+               if (qos == 2 && !internal) {
+                  state.getPubRec().add(packetId);
+               }
+
                if (message.fixedHeader().isRetain()) {
                   ByteBuf payload = message.payload();
                   boolean reset = payload instanceof EmptyByteBuf || 
payload.capacity() == 0;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
index 62f95466c0..f012221764 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.mqtt;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
@@ -42,6 +43,10 @@ public class PahoMQTTQOS2SecurityTest extends 
MQTTTestSupport {
    String user1 = "user1";
    String password1 = "password1";
 
+   final String noSendUser = "noSendUser";
+   final String noSendPassword = "noSendPassword";
+   final String noSendRole = "noSendRole";
+
    @Override
    protected void configureBrokerSecurity(ActiveMQServer server) {
       super.configureBrokerSecurity(server);
@@ -51,10 +56,14 @@ public class PahoMQTTQOS2SecurityTest extends 
MQTTTestSupport {
       securityManager.getConfiguration().addUser(user1, password1);
       securityManager.getConfiguration().addRole(user1, "addressOnly");
 
+      securityManager.getConfiguration().addUser(noSendUser, noSendPassword);
+      securityManager.getConfiguration().addRole(noSendUser, noSendRole);
+
       // Configure roles
       HierarchicalRepository<Set<Role>> securityRepository = 
server.getSecurityRepository();
       Set<Role> value = new HashSet<>();
       value.add(new Role("addressOnly", true, true, true, true, false, false, 
false, false, true, true, false, false));
+      value.add(new Role(noSendRole, false, true, true, true, true, true, 
true, true, true, true, true, true));
 
       
securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(),
 server.getConfiguration().getWildcardConfiguration()), value);
    }
@@ -108,6 +117,27 @@ public class PahoMQTTQOS2SecurityTest extends 
MQTTTestSupport {
       assertFalse(failed[0]);
    }
 
+   @Test
+   @Timeout(60)
+   public void testSendQoS2UnauthorizedNotStorePublish() throws Exception {
+      final String clientID = "clientID";
+
+      MqttClient producer = createPahoClient(clientID);
+      MqttConnectOptions conOpt = new MqttConnectOptions();
+      conOpt.setCleanSession(false);
+      conOpt.setUserName(noSendUser);
+      conOpt.setPassword(noSendPassword.toCharArray());
+
+      producer.connect(conOpt);
+      try {
+         producer.publish(getQueueName(), "hello".getBytes(), 2, false);
+      } catch (MqttException e) {
+         // ignore
+      }
+      assertEquals(0, getSessions().get(clientID).getPubRec().size());
+      producer.close();
+   }
+
    private MqttClient createPahoClient(String clientId) throws MqttException {
       return new MqttClient("tcp://localhost:" + getPort(), clientId, new 
MemoryPersistence());
    }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to