This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c8d7f8acf5 ARTEMIS-4924 handle invalid messages in SNF queue
c8d7f8acf5 is described below
commit c8d7f8acf57ba7f1cc3cef2b80905e347f2f03c0
Author: Howard Gao <[email protected]>
AuthorDate: Tue Jul 16 16:31:36 2024 +0800
ARTEMIS-4924 handle invalid messages in SNF queue
- Send invalid messages in SNF queue to DLA
- Add documentation for proper store-and-forward queue usage
---
.../artemis/core/server/ActiveMQMessageBundle.java | 3 +
.../artemis/core/server/ActiveMQServerLogger.java | 10 +-
.../core/server/cluster/impl/BridgeImpl.java | 14 +-
.../cluster/impl/ClusterConnectionBridge.java | 8 +-
docs/user-manual/clusters.adoc | 15 +++
.../bridge/ClusteredBridgeReconnectTest.java | 148 ++++++++++++++++++++-
6 files changed, 179 insertions(+), 19 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 014616d8fa..4a81abaaf2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -535,4 +535,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229257, value = "IDGenerator has been stopped")
RuntimeException idGeneratorStopped();
+ @Message(id = 229258, value = "Invalid cluster bridge message! No queue IDs
defined in the property {}")
+ ActiveMQIllegalStateException noQueueIdsDefined(SimpleString idsHeaderName);
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 1a8253d5de..6a139815b1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -52,7 +52,7 @@ import
org.apache.activemq.artemis.spi.core.remoting.Connection;
/**
* Logger Codes 220000 - 228999
*/
-@LogBundle(projectCode = "AMQ", regexID = "22[0-8][0-9]{3}", retiredIDs =
{221026, 221052, 222003, 222012, 222015, 222020, 222021, 222022, 222024,
222027, 222028, 222029, 222048, 222052, 222058, 222064, 222071, 222078, 222079,
222083, 222084, 222088, 222090, 222102, 222105, 222128, 222134, 222135, 222152,
222159, 222163, 222167, 222170, 222171, 222182, 222190, 222192, 222193, 222204,
222252, 222255, 222257, 222259, 222260, 222276, 222277, 222288, 224001, 224002,
224003, 224005, 224013, 2 [...]
+@LogBundle(projectCode = "AMQ", regexID = "22[0-8][0-9]{3}", retiredIDs =
{221026, 221052, 222003, 222012, 222015, 222020, 222021, 222022, 222024,
222027, 222028, 222029, 222048, 222052, 222058, 222064, 222071, 222078, 222079,
222083, 222084, 222088, 222090, 222102, 222105, 222110, 222128, 222134, 222135,
222152, 222159, 222163, 222167, 222170, 222171, 222182, 222190, 222192, 222193,
222204, 222252, 222255, 222257, 222259, 222260, 222276, 222277, 222288, 224001,
224002, 224003, 224005, 2 [...]
public interface ActiveMQServerLogger {
// Note: logger ID 224127 uses
"org.apache.activemq.artemis.core.server.Queue" for its logger category, rather
than ActiveMQServerLogger.class.getPackage().getName()
@@ -577,11 +577,6 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222109, value = "Timed out waiting for write lock on
consumer {} from {}. Check the Thread dump", level = LogMessage.Level.WARN)
void timeoutLockingConsumer(String consumer, String remoteAddress);
- @LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage
= {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
- void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
- org.apache.activemq.artemis.api.core.Message
messageCopy,
- SimpleString idsHeaderName);
-
@LogMessage(id = 222111, value = "exception while invoking {} on {}", level
= LogMessage.Level.TRACE)
void managementOperationError(String op, String resourceName, Exception e);
@@ -1518,4 +1513,7 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224150, value = "Messages will be dropped on address {} /
queue {}. Queue is disabled.", level = LogMessage.Level.INFO)
void noRouteMessagesWillBeDropped(SimpleString addressName, SimpleString
queueName);
+
+ @LogMessage(id = 224151, value = "Bridge {} unable to handle message: {}.
Root cause: {}", level = LogMessage.Level.INFO)
+ void bridgeUnableToHandleMessage(SimpleString bridgeName, String message,
String exceptionMessage);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index c52e4eab2b..0c52679a7a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -529,7 +529,7 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
/**
* Hook for processing message before forwarding
*/
- protected Message beforeForward(Message message, final SimpleString
forwardingAddress) {
+ protected Message beforeForward(Message message, final SimpleString
forwardingAddress) throws ActiveMQException {
message = message.copy();
((RefCountMessage)message).setParentRef((RefCountMessage)message);
@@ -630,7 +630,17 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
dest = ref.getMessage().getAddressSimpleString();
}
- final Message message = beforeForward(ref.getMessage(), dest);
+ final Message message;
+ try {
+ message = beforeForward(ref.getMessage(), dest);
+ } catch (ActiveMQException ex) {
+
ActiveMQServerLogger.LOGGER.bridgeUnableToHandleMessage(getName(),
ref.getMessage().toString(), ex.getMessage());
+ ref.getQueue().sendToDeadLetterAddress(null, ref);
+ synchronized (refs) {
+ refs.remove(ref.getMessageID());
+ }
+ return HandleStatus.HANDLED;
+ }
pendingAcks.countUp();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 64235fdd25..93e5314e32 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -47,6 +47,7 @@ import
org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
@@ -185,7 +186,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
@Override
- protected Message beforeForward(final Message message, final SimpleString
forwardingAddress) {
+ protected Message beforeForward(final Message message, final SimpleString
forwardingAddress) throws ActiveMQException {
// We make a copy of the message, then we strip out the unwanted routing
id headers and leave
// only
// the one pertinent for the address node - this is important since
different queues on different
@@ -200,11 +201,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
Set<SimpleString> propNames = new
HashSet<>(messageCopy.getPropertyNames());
byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
-
if (queueIds == null) {
- // Sanity check only
- ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy,
idsHeaderName);
- throw new IllegalStateException("no queueIDs defined");
+ throw ActiveMQMessageBundle.BUNDLE.noQueueIdsDefined(idsHeaderName);
}
for (SimpleString propName : propNames) {
diff --git a/docs/user-manual/clusters.adoc b/docs/user-manual/clusters.adoc
index bde0249c57..c6f7861131 100644
--- a/docs/user-manual/clusters.adoc
+++ b/docs/user-manual/clusters.adoc
@@ -729,6 +729,21 @@ The default value is `-1`.
It often makes sense to introduce a delay before redistributing as it's a
common case that a consumer closes but another one quickly is created on the
same queue, in such a case you probably don't want to redistribute immediately
since the new consumer will arrive shortly.
+[WARNING]
+====
+The broker uses internal store-and-forward queues to handle message which need
to be sent to other nodes in the cluster.
+Clients *should not* directly send messages to these store-and-forward queues.
+If a client sends a message to a store-and-forward queue it will be sent to
the corresponding dead-letter address.
+It's possible to prevent clients from sending any messages to these internal
queues by revoking all security permissions using the following
`security-setting`:
+
+[,xml]
+----
+<security-setting match="$.artemis.internal.sf.#"/>
+----
+
+This configuration will not impact the internal processes which manage the
store-and-foward queues.
+====
+
== Cluster topologies
Apache ActiveMQ Artemis clusters can be connected together in many different
topologies, let's consider the two most common ones here
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
index 529ea1b6c9..2807071454 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
@@ -16,12 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -44,6 +38,7 @@ import
org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
@@ -56,6 +51,12 @@ import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
/**
* This will simulate a failure of a failure. The bridge could eventually
during a race or multiple failures not be able
* to reconnect because it failed again. this should make the bridge to always
reconnect itself.
@@ -412,6 +413,141 @@ public class ClusteredBridgeReconnectTest extends
ClusterTestBase {
stopServers(0, 1);
}
+ @Test
+ public void testBadClientSendMessagesToSnFQueue() throws Exception {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues",
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+
+ setupClusterConnection("cluster1", "queues",
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+ String dla = "DLA";
+ AddressSettings addressSettings = new
AddressSettings().setDeadLetterAddress(SimpleString.of(dla));
+
+ servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
+ servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, dla, dla, null, true);
+ createQueue(1, dla, dla, null, true);
+
+ waitForBindings(0, dla, 1, 0, true);
+ waitForBindings(1, dla, 1, 0, true);
+
+ ClientSession session0 = sfs[0].createSession();
+ ClientSession session1 = sfs[1].createSession();
+
+ session0.start();
+ session1.start();
+
+ final long num = 10;
+
+ SimpleString nodeId1 = servers[1].getNodeID();
+ ClusterConnectionImpl cc0 = (ClusterConnectionImpl)
servers[0].getClusterManager().getClusterConnection("cluster0");
+ SimpleString snfQueue0Name = cc0.getSfQueueName(nodeId1.toString());
+
+ ClientProducer badProducer0 = session0.createProducer(snfQueue0Name);
+ for (int i = 0; i < num; i++) {
+ Message msg = session0.createMessage(true);
+ msg.putStringProperty("origin", "from producer 0");
+ badProducer0.send(msg);
+ }
+
+ // add a remote queue and consumer to enable message to flow from node 0
to node 1
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ ClientConsumer consumer1 = session1.createConsumer("queue0");
+
+ waitForBindings(0, "queues.testaddress", 0, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 0, 0, false);
+
+ for (int i = 0; i < num; i++) {
+ Message msg = session0.createMessage(true);
+ msg.putStringProperty("origin", "from producer 0");
+ badProducer0.send(msg);
+ }
+
+ // verify metrics on the SnF queue
+ Queue snfQueue0 = servers[0].locateQueue(snfQueue0Name);
+ Wait.assertEquals(num * 2, () -> snfQueue0.getMessagesAdded(), 2000, 10);
+ Wait.assertEquals(num * 2, () -> snfQueue0.getMessagesKilled(), 2000,
10);
+ Wait.assertEquals(0L, () -> snfQueue0.getMessageCount(), 2000, 10);
+ Wait.assertEquals(0L, () -> snfQueue0.getDeliveringCount(), 2000, 10);
+ Wait.assertEquals(0L, () -> snfQueue0.getDeliveringSize(), 2000, 10);
+
+ // verify metrics on the DLA queue
+ Queue dlaQueue0 = servers[0].locateQueue(dla);
+ Wait.assertEquals(num * 2, () -> dlaQueue0.getMessagesAdded(), 2000, 10);
+ Wait.assertEquals(num * 2, () -> dlaQueue0.getMessageCount(), 2000, 10);
+
+ // messages will never reach the consumer
+ assertNull(consumer1.receiveImmediate());
+
+ ClientConsumer dlqConsumer = session0.createConsumer(dla);
+
+ for (int i = 0; i < num * 2; i++) {
+ ClientMessage m = dlqConsumer.receive(5000);
+ assertNotNull(m);
+ String propValue = m.getStringProperty("origin");
+ assertEquals("from producer 0", propValue);
+ m.acknowledge();
+ }
+ session0.commit();
+ assertNull(dlqConsumer.receiveImmediate());
+ Wait.assertEquals(num * 2, () -> dlaQueue0.getMessagesAcknowledged(),
2000, 10);
+ Wait.assertEquals(0L, () -> dlaQueue0.getMessageCount(), 2000, 10);
+ Wait.assertEquals(0L, () -> dlaQueue0.getDeliveringCount(), 2000, 10);
+ Wait.assertEquals(0L, () -> dlaQueue0.getDeliveringSize(), 2000, 10);
+
+ // normal message flow should work
+ ClientProducer goodProducer0 =
session0.createProducer("queues.testaddress");
+ for (int i = 0; i < num; i++) {
+ Message msg = session0.createMessage(true);
+ msg.putStringProperty("origin", "from producer 0");
+ goodProducer0.send(msg);
+ }
+
+ // consumer1 can receive from node0
+ for (int i = 0; i < num; i++) {
+ ClientMessage m = consumer1.receive(5000);
+ assertNotNull(m);
+ String propValue = m.getStringProperty("origin");
+ assertEquals("from producer 0", propValue);
+ m.acknowledge();
+ }
+ assertNull(consumer1.receiveImmediate());
+
+ stopServers(0, 1);
+
+ // restart and verify all metrics again to ensure messages don't come
back
+ startServers(0, 1);
+
+ // verify metrics on the SnF queue
+ Queue snfQueue0AfterRestart = servers[0].locateQueue(snfQueue0Name);
+ Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getMessagesAdded(),
2000, 10);
+ Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getMessagesKilled(),
2000, 10);
+ Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getMessageCount(),
2000, 10);
+ Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getDeliveringCount(),
2000, 10);
+ Wait.assertEquals(0L, () -> snfQueue0AfterRestart.getDeliveringSize(),
2000, 10);
+
+ // verify metrics on the DLA queue
+ Queue dlaQueue0AfterRestart = servers[0].locateQueue(dla);
+ Wait.assertEquals(0L, () -> dlaQueue0AfterRestart.getMessagesAdded(),
2000, 10);
+ Wait.assertEquals(0L, () ->
dlaQueue0AfterRestart.getMessagesAcknowledged(), 2000, 10);
+ Wait.assertEquals(0L, () -> dlaQueue0AfterRestart.getMessageCount(),
2000, 10);
+ Wait.assertEquals(0L, () -> dlaQueue0AfterRestart.getDeliveringCount(),
2000, 10);
+ Wait.assertEquals(0L, () -> dlaQueue0AfterRestart.getDeliveringSize(),
2000, 10);
+
+ stopServers(0, 1);
+ }
+
@Override
@AfterEach
public void tearDown() throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact