This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 31c84c4c39 ARTEMIS-5798 MQTT QoS2 unauthorized pub ignored on retry
31c84c4c39 is described below
commit 31c84c4c39096f26ac96a91f776e3a88ca166aa9
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Mar 4 13:49:16 2026 -0600
ARTEMIS-5798 MQTT QoS2 unauthorized pub ignored on retry
---
.../core/protocol/mqtt/MQTTPublishManager.java | 7 ++---
.../integration/mqtt/PahoMQTTQOS2SecurityTest.java | 30 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 3 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 4f5fec62a5..5f8d8453f8 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -208,9 +208,6 @@ public class MQTTPublishManager {
int packetId = message.variableHeader().packetId();
boolean qos2PublishAlreadyReceived =
state.getPubRec().contains(packetId);
if (qos < 2 || !qos2PublishAlreadyReceived) {
- if (qos == 2 && !internal)
- state.getPubRec().add(packetId);
-
Transaction tx = session.getServerSession().newTransaction();
try {
AddressInfo addressInfo =
session.getServer().getAddressInfo(address);
@@ -223,6 +220,10 @@ public class MQTTPublishManager {
}
session.getServerSession().send(tx, serverMessage, true,
senderName, false);
+ if (qos == 2 && !internal) {
+ state.getPubRec().add(packetId);
+ }
+
if (message.fixedHeader().isRetain()) {
ByteBuf payload = message.payload();
boolean reset = payload instanceof EmptyByteBuf ||
payload.capacity() == 0;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
index 62f95466c0..f012221764 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
@@ -42,6 +43,10 @@ public class PahoMQTTQOS2SecurityTest extends
MQTTTestSupport {
String user1 = "user1";
String password1 = "password1";
+ final String noSendUser = "noSendUser";
+ final String noSendPassword = "noSendPassword";
+ final String noSendRole = "noSendRole";
+
@Override
protected void configureBrokerSecurity(ActiveMQServer server) {
super.configureBrokerSecurity(server);
@@ -51,10 +56,14 @@ public class PahoMQTTQOS2SecurityTest extends
MQTTTestSupport {
securityManager.getConfiguration().addUser(user1, password1);
securityManager.getConfiguration().addRole(user1, "addressOnly");
+ securityManager.getConfiguration().addUser(noSendUser, noSendPassword);
+ securityManager.getConfiguration().addRole(noSendUser, noSendRole);
+
// Configure roles
HierarchicalRepository<Set<Role>> securityRepository =
server.getSecurityRepository();
Set<Role> value = new HashSet<>();
value.add(new Role("addressOnly", true, true, true, true, false, false,
false, false, true, true, false, false));
+ value.add(new Role(noSendRole, false, true, true, true, true, true,
true, true, true, true, true, true));
securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(),
server.getConfiguration().getWildcardConfiguration()), value);
}
@@ -108,6 +117,27 @@ public class PahoMQTTQOS2SecurityTest extends
MQTTTestSupport {
assertFalse(failed[0]);
}
+ @Test
+ @Timeout(60)
+ public void testSendQoS2UnauthorizedNotStorePublish() throws Exception {
+ final String clientID = "clientID";
+
+ MqttClient producer = createPahoClient(clientID);
+ MqttConnectOptions conOpt = new MqttConnectOptions();
+ conOpt.setCleanSession(false);
+ conOpt.setUserName(noSendUser);
+ conOpt.setPassword(noSendPassword.toCharArray());
+
+ producer.connect(conOpt);
+ try {
+ producer.publish(getQueueName(), "hello".getBytes(), 2, false);
+ } catch (MqttException e) {
+ // ignore
+ }
+ assertEquals(0, getSessions().get(clientID).getPubRec().size());
+ producer.close();
+ }
+
private MqttClient createPahoClient(String clientId) throws MqttException {
return new MqttClient("tcp://localhost:" + getPort(), clientId, new
MemoryPersistence());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]