This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c8d7f8acf5 ARTEMIS-4924 handle invalid messages in SNF queue
c8d7f8acf5 is described below

commit c8d7f8acf57ba7f1cc3cef2b80905e347f2f03c0
Author: Howard Gao <[email protected]>
AuthorDate: Tue Jul 16 16:31:36 2024 +0800

    ARTEMIS-4924 handle invalid messages in SNF queue
    
      - Send invalid messages in SNF queue to DLA
      - Add documentation for proper store-and-forward queue usage
---
 .../artemis/core/server/ActiveMQMessageBundle.java |   3 +
 .../artemis/core/server/ActiveMQServerLogger.java  |  10 +-
 .../core/server/cluster/impl/BridgeImpl.java       |  14 +-
 .../cluster/impl/ClusterConnectionBridge.java      |   8 +-
 docs/user-manual/clusters.adoc                     |  15 +++
 .../bridge/ClusteredBridgeReconnectTest.java       | 148 ++++++++++++++++++++-
 6 files changed, 179 insertions(+), 19 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 014616d8fa..4a81abaaf2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -535,4 +535,7 @@ public interface ActiveMQMessageBundle {
    @Message(id = 229257, value = "IDGenerator has been stopped")
    RuntimeException idGeneratorStopped();
 
+   @Message(id = 229258, value = "Invalid cluster bridge message! No queue IDs 
defined in the property {}")
+   ActiveMQIllegalStateException noQueueIdsDefined(SimpleString idsHeaderName);
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 1a8253d5de..6a139815b1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -52,7 +52,7 @@ import 
org.apache.activemq.artemis.spi.core.remoting.Connection;
 /**
  * Logger Codes 220000 - 228999
  */
-@LogBundle(projectCode = "AMQ", regexID = "22[0-8][0-9]{3}", retiredIDs = 
{221026, 221052, 222003, 222012, 222015, 222020, 222021, 222022, 222024, 
222027, 222028, 222029, 222048, 222052, 222058, 222064, 222071, 222078, 222079, 
222083, 222084, 222088, 222090, 222102, 222105, 222128, 222134, 222135, 222152, 
222159, 222163, 222167, 222170, 222171, 222182, 222190, 222192, 222193, 222204, 
222252, 222255, 222257, 222259, 222260, 222276, 222277, 222288, 224001, 224002, 
224003, 224005, 224013, 2 [...]
+@LogBundle(projectCode = "AMQ", regexID = "22[0-8][0-9]{3}", retiredIDs = 
{221026, 221052, 222003, 222012, 222015, 222020, 222021, 222022, 222024, 
222027, 222028, 222029, 222048, 222052, 222058, 222064, 222071, 222078, 222079, 
222083, 222084, 222088, 222090, 222102, 222105, 222110, 222128, 222134, 222135, 
222152, 222159, 222163, 222167, 222170, 222171, 222182, 222190, 222192, 222193, 
222204, 222252, 222255, 222257, 222259, 222260, 222276, 222277, 222288, 224001, 
224002, 224003, 224005, 2 [...]
 public interface ActiveMQServerLogger {
 
    // Note: logger ID 224127 uses 
"org.apache.activemq.artemis.core.server.Queue" for its logger category, rather 
than ActiveMQServerLogger.class.getPackage().getName()
@@ -577,11 +577,6 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 222109, value = "Timed out waiting for write lock on 
consumer {} from {}. Check the Thread dump", level = LogMessage.Level.WARN)
    void timeoutLockingConsumer(String consumer, String remoteAddress);
 
-   @LogMessage(id = 222110, value = "no queue IDs defined!,  originalMessage  
= {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
-   void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
-                         org.apache.activemq.artemis.api.core.Message 
messageCopy,
-                         SimpleString idsHeaderName);
-
    @LogMessage(id = 222111, value = "exception while invoking {} on {}", level 
= LogMessage.Level.TRACE)
    void managementOperationError(String op, String resourceName, Exception e);
 
@@ -1518,4 +1513,7 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224150, value = "Messages will be dropped on address {} / 
queue {}. Queue is disabled.", level = LogMessage.Level.INFO)
    void noRouteMessagesWillBeDropped(SimpleString addressName, SimpleString 
queueName);
+
+   @LogMessage(id = 224151, value = "Bridge {} unable to handle message: {}. 
Root cause: {}", level = LogMessage.Level.INFO)
+   void bridgeUnableToHandleMessage(SimpleString bridgeName, String message, 
String exceptionMessage);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index c52e4eab2b..0c52679a7a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -529,7 +529,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
    /**
     * Hook for processing message before forwarding
     */
-   protected Message beforeForward(Message message, final SimpleString 
forwardingAddress) {
+   protected Message beforeForward(Message message, final SimpleString 
forwardingAddress) throws ActiveMQException {
       message = message.copy();
       ((RefCountMessage)message).setParentRef((RefCountMessage)message);
 
@@ -630,7 +630,17 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
                dest = ref.getMessage().getAddressSimpleString();
             }
 
-            final Message message = beforeForward(ref.getMessage(), dest);
+            final Message message;
+            try {
+               message = beforeForward(ref.getMessage(), dest);
+            } catch (ActiveMQException ex) {
+               
ActiveMQServerLogger.LOGGER.bridgeUnableToHandleMessage(getName(), 
ref.getMessage().toString(), ex.getMessage());
+               ref.getQueue().sendToDeadLetterAddress(null, ref);
+               synchronized (refs) {
+                  refs.remove(ref.getMessageID());
+               }
+               return HandleStatus.HANDLED;
+            }
 
             pendingAcks.countUp();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 64235fdd25..93e5314e32 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -47,6 +47,7 @@ import 
org.apache.activemq.artemis.core.config.TransformerConfiguration;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
@@ -185,7 +186,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
    }
 
    @Override
-   protected Message beforeForward(final Message message, final SimpleString 
forwardingAddress) {
+   protected Message beforeForward(final Message message, final SimpleString 
forwardingAddress) throws ActiveMQException {
       // We make a copy of the message, then we strip out the unwanted routing 
id headers and leave
       // only
       // the one pertinent for the address node - this is important since 
different queues on different
@@ -200,11 +201,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
       Set<SimpleString> propNames = new 
HashSet<>(messageCopy.getPropertyNames());
 
       byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
-
       if (queueIds == null) {
-         // Sanity check only
-         ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, 
idsHeaderName);
-         throw new IllegalStateException("no queueIDs defined");
+         throw ActiveMQMessageBundle.BUNDLE.noQueueIdsDefined(idsHeaderName);
       }
 
       for (SimpleString propName : propNames) {
diff --git a/docs/user-manual/clusters.adoc b/docs/user-manual/clusters.adoc
index bde0249c57..c6f7861131 100644
--- a/docs/user-manual/clusters.adoc
+++ b/docs/user-manual/clusters.adoc
@@ -729,6 +729,21 @@ The default value is `-1`.
 
 It often makes sense to introduce a delay before redistributing as it's a 
common case that a consumer closes but another one quickly is created on the 
same queue, in such a case you probably don't want to redistribute immediately 
since the new consumer will arrive shortly.
 
+[WARNING]
+====
+The broker uses internal store-and-forward queues to handle message which need 
to be sent to other nodes in the cluster.
+Clients *should not* directly send messages to these store-and-forward queues.
+If a client sends a message to a store-and-forward queue it will be sent to 
the corresponding dead-letter address.
+It's possible to prevent clients from sending any messages to these internal 
queues by revoking all security permissions using the following 
`security-setting`:
+
+[,xml]
+----
+<security-setting match="$.artemis.internal.sf.#"/>
+----
+
+This configuration will not impact the internal processes which manage the 
store-and-foward queues.
+====
+
 == Cluster topologies
 
 Apache ActiveMQ Artemis clusters can be connected together in many different 
topologies, let's consider the two most common ones here
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
index 529ea1b6c9..2807071454 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.bridge;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -44,6 +38,7 @@ import 
org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
@@ -56,6 +51,12 @@ import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 /**
  * This will simulate a failure of a failure. The bridge could eventually 
during a race or multiple failures not be able
  * to reconnect because it failed again. this should make the bridge to always 
reconnect itself.
@@ -412,6 +413,141 @@ public class ClusteredBridgeReconnectTest extends 
ClusterTestBase {
       stopServers(0, 1);
    }
 
+   @Test
+   public void testBadClientSendMessagesToSnFQueue() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      String dla = "DLA";
+      AddressSettings addressSettings = new 
AddressSettings().setDeadLetterAddress(SimpleString.of(dla));
+
+      servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
+      servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, dla, dla, null, true);
+      createQueue(1, dla, dla, null, true);
+
+      waitForBindings(0, dla, 1, 0, true);
+      waitForBindings(1, dla, 1, 0, true);
+
+      ClientSession session0 = sfs[0].createSession();
+      ClientSession session1 = sfs[1].createSession();
+
+      session0.start();
+      session1.start();
+
+      final long num = 10;
+
+      SimpleString nodeId1 = servers[1].getNodeID();
+      ClusterConnectionImpl cc0 = (ClusterConnectionImpl) 
servers[0].getClusterManager().getClusterConnection("cluster0");
+      SimpleString snfQueue0Name = cc0.getSfQueueName(nodeId1.toString());
+
+      ClientProducer badProducer0 = session0.createProducer(snfQueue0Name);
+      for (int i = 0; i < num; i++) {
+         Message msg = session0.createMessage(true);
+         msg.putStringProperty("origin", "from producer 0");
+         badProducer0.send(msg);
+      }
+
+      // add a remote queue and consumer to enable message to flow from node 0 
to node 1
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+      ClientConsumer consumer1 = session1.createConsumer("queue0");
+
+      waitForBindings(0, "queues.testaddress", 0, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 0, 0, false);
+
+      for (int i = 0; i < num; i++) {
+         Message msg = session0.createMessage(true);
+         msg.putStringProperty("origin", "from producer 0");
+         badProducer0.send(msg);
+      }
+
+      // verify metrics on the SnF queue
+      Queue snfQueue0 = servers[0].locateQueue(snfQueue0Name);
+      Wait.assertEquals(num * 2, () -> snfQueue0.getMessagesAdded(), 2000, 10);
+      Wait.assertEquals(num * 2, () -> snfQueue0.getMessagesKilled(), 2000, 
10);
+      Wait.assertEquals(0L, () -> snfQueue0.getMessageCount(), 2000, 10);
+      Wait.assertEquals(0L, () -> snfQueue0.getDeliveringCount(), 2000, 10);
+      Wait.assertEquals(0L, () -> snfQueue0.getDeliveringSize(), 2000, 10);
+
+      // verify metrics on the DLA queue
+      Queue dlaQueue0 = servers[0].locateQueue(dla);
+      Wait.assertEquals(num * 2, () -> dlaQueue0.getMessagesAdded(), 2000, 10);
+      Wait.assertEquals(num * 2, () -> dlaQueue0.getMessageCount(), 2000, 10);
+
+      // messages will never reach the consumer
+      assertNull(consumer1.receiveImmediate());
+
+      ClientConsumer dlqConsumer = session0.createConsumer(dla);
+
+      for (int i = 0; i < num * 2; i++) {
+         ClientMessage m = dlqConsumer.receive(5000);
+         assertNotNull(m);
+         String propValue = m.getStringProperty("origin");
+         assertEquals("from producer 0", propValue);
+         m.acknowledge();
+      }
+      session0.commit();
+      assertNull(dlqConsumer.receiveImmediate());
+      Wait.assertEquals(num * 2, () -> dlaQueue0.getMessagesAcknowledged(), 
2000, 10);
+      Wait.assertEquals(0L, () -> dlaQueue0.getMessageCount(), 2000, 10);
+      Wait.assertEquals(0L, () -> dlaQueue0.getDeliveringCount(), 2000, 10);
+      Wait.assertEquals(0L, () -> dlaQueue0.getDeliveringSize(), 2000, 10);
+
+      // normal message flow should work
+      ClientProducer goodProducer0 = 
session0.createProducer("queues.testaddress");
+      for (int i = 0; i < num; i++) {
+         Message msg = session0.createMessage(true);
+         msg.putStringProperty("origin", "from producer 0");
+         goodProducer0.send(msg);
+      }
+
+      // consumer1 can receive from node0
+      for (int i = 0; i < num; i++) {
+         ClientMessage m = consumer1.receive(5000);
+         assertNotNull(m);
+         String propValue = m.getStringProperty("origin");
+         assertEquals("from producer 0", propValue);
+         m.acknowledge();
+      }
+      assertNull(consumer1.receiveImmediate());
+
+      stopServers(0, 1);
+
+      // restart and verify all metrics again to ensure messages don't come 
back
+      startServers(0, 1);
+
+      // verify metrics on the SnF queue
+      Queue snfQueue0AfterRestart = servers[0].locateQueue(snfQueue0Name);
+      Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getMessagesAdded(), 
2000, 10);
+      Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getMessagesKilled(), 
2000, 10);
+      Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getMessageCount(), 
2000, 10);
+      Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getDeliveringCount(), 
2000, 10);
+      Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getDeliveringSize(), 
2000, 10);
+
+      // verify metrics on the DLA queue
+      Queue dlaQueue0AfterRestart = servers[0].locateQueue(dla);
+      Wait.assertEquals(0L, () -> dlaQueue0AfterRestart.getMessagesAdded(), 
2000, 10);
+      Wait.assertEquals(0L, () -> 
dlaQueue0AfterRestart.getMessagesAcknowledged(), 2000, 10);
+      Wait.assertEquals(0L, () -> dlaQueue0AfterRestart.getMessageCount(), 
2000, 10);
+      Wait.assertEquals(0L, () -> dlaQueue0AfterRestart.getDeliveringCount(), 
2000, 10);
+      Wait.assertEquals(0L, () -> dlaQueue0AfterRestart.getDeliveringSize(), 
2000, 10);
+
+      stopServers(0, 1);
+   }
+
    @Override
    @AfterEach
    public void tearDown() throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to