This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new fdcf640f2b ARTEMIS-5669 AMQP bridge receivers prefer modified
dispositions by default
fdcf640f2b is described below
commit fdcf640f2b42a37ad54982579ef1e6b6b1405833
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Sep 18 14:37:35 2025 -0400
ARTEMIS-5669 AMQP bridge receivers prefer modified dispositions by default
AMQP Bridge receiver links configure themselves to a default of sending a
modified
disposition for address full errors to allow the remote sender to redeliver
the message
instead of the broker default which is to send rejected dispositions
meaning the remote
must discard or DLQ the message. Allows for configuration at the bridge
policy level
to override this and ignore the connector URI as this is an opinionated
configuration for
bridged resources. Also adds ability to configure the drain on address full
and delivery
is rejected at the bridge configuration level.
---
.../connect/bridge/AMQPBridgeConfiguration.java | 46 +++++
.../amqp/connect/bridge/AMQPBridgeConstants.java | 37 +++-
.../bridge/AMQPBridgeFromAddressReceiver.java | 15 ++
.../bridge/AMQPBridgeFromQueueReceiver.java | 15 ++
.../bridge/AMQPBridgeReceiverConfiguration.java | 45 +++++
.../amqp/connect/AMQPBridgeFromAddressTest.java | 189 +++++++++++++++++-
.../amqp/connect/AMQPBridgeFromQueueTest.java | 219 +++++++++++++++++++++
.../connect/AMQPFederationQueuePolicyTest.java | 2 -
8 files changed, 560 insertions(+), 8 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
index b646b37346..de2800af31 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
@@ -30,6 +30,9 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.IGNORE_QUEUE_FILTERS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.AUTO_DELETE_DURABLE_SUBSCRIPTION;
@@ -55,6 +58,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_SEND_SETTLED;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_DEMAND_TRACKING;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.IGNORE_QUEUE_CONSUMER_FILTERS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -378,4 +382,46 @@ public class AMQPBridgeConfiguration {
return DEFAULT_PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
}
}
+
+ /**
+ * (@return the use modified for transient delivery errors configuration}
+ */
+ public boolean isUseModifiedForTransientDeliveryErrors() {
+ final Object property =
properties.get(USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS);
+ if (property instanceof Boolean booleanValue) {
+ return booleanValue;
+ } else if (property instanceof String string) {
+ return Boolean.parseBoolean(string);
+ } else {
+ return DEFAULT_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
+ }
+ }
+
+ /**
+ * (@return the drain link credit on transient delivery errors
configuration}
+ */
+ public boolean isDrainOnTransientDeliveryErrors() {
+ final Object property =
properties.get(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS);
+ if (property instanceof Boolean booleanValue) {
+ return booleanValue;
+ } else if (property instanceof String string) {
+ return Boolean.parseBoolean(string);
+ } else {
+ return
connection.getProtocolManager().isDrainOnTransientDeliveryErrors();
+ }
+ }
+
+ /**
+ * {@return the bridge receiver link quiesce timeout configuration}
+ */
+ public int getLinkQuiesceTimeout() {
+ final Object property = properties.get(RECEIVER_LINK_QUIESCE_TIMEOUT);
+ if (property instanceof Number number) {
+ return number.intValue();
+ } else if (property instanceof String string) {
+ return Integer.parseInt(string);
+ } else {
+ return connection.getProtocolManager().getLinkQuiesceTimeout();
+ }
+ }
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
index 66fcbf84a0..0d060de85c 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
@@ -271,7 +271,7 @@ public final class AMQPBridgeConstants {
/**
* Default value for the auto delete address sender durable subscription
binding.
*/
- public static boolean DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION = false;
+ public static final boolean DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION =
false;
/**
* Encodes a boolean value that indicates if AMQP bridge senders should
configure an auto delete option
@@ -284,7 +284,7 @@ public final class AMQPBridgeConstants {
/**
* Default value for the auto delete address sender message count for
durable subscription bindings.
*/
- public static long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_MSG_COUNT = 0;
+ public static final long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_MSG_COUNT
= 0;
/**
* Encodes a signed long value that controls the delay before auto deletion
if using durable address
@@ -295,7 +295,7 @@ public final class AMQPBridgeConstants {
/**
* Default value for the auto delete address sender message count for
durable subscription bindings.
*/
- public static long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY = 0;
+ public static final long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY = 0;
/**
* Encodes a signed long value that controls the message count value that
allows for address auto delete
@@ -303,4 +303,35 @@ public final class AMQPBridgeConstants {
*/
public static final String AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY =
"auto-delete-durable-subscription-delay";
+ /**
+ * Configuration property for how a bridge receiver should respond to
delivery errors indicating that an address is
+ * full and cannot accept messages at this time. By default we want to send
Modified outcomes with the delivery failed
+ * value set to true such that the remote will deliver the message again
after incrementing the delivery count of the
+ * message.
+ */
+ public static final String USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS =
"amqpUseModifiedForTransientDeliveryErrors";
+
+ /**
+ * Default value for how a bridge receiver should respond to delivery
errors indicating that an address is full
+ * and cannot accept messages at this time. By default we want to send
Modified outcomes with the delivery failed
+ * value set to true such that the remote will deliver the message again
after incrementing the delivery count of
+ * the message. This is an opinionated choice and the value set on the
connector URI is not referenced by bridges
+ * as we want to maintain this behavior unless specifically set on bridge
configuration explicitly.
+ */
+ public static final boolean
DEFAULT_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = true;
+
+ /**
+ * Configuration property that defines the time in milliseconds that a
receiver will wait before considering a pending
+ * quiesce timeout to have failed and should close the link. This option
can be used to override the value specified on
+ * the connector URI to allow bridges to operate with a different default.
+ */
+ public static final String RECEIVER_LINK_QUIESCE_TIMEOUT =
"amqpLinkQuiesceTimeout";
+
+ /**
+ * Configuration property that defines if a bridge receiver should drain
the link credit when a transient delivery
+ * error such as the address being full occurs. This option can be used to
override the value specified on the
+ * connector URI to allow bridges to operate with a different default.
+ */
+ public static final String RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS =
"amqpDrainOnTransientDeliveryErrors";
+
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
index de12f2a4bb..e879368706 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
@@ -276,6 +276,21 @@ public class AMQPBridgeFromAddressReceiver extends
AMQPBridgeReceiver {
}
}
+ @Override
+ protected boolean
isUseModifiedForTransientDeliveryErrors(AMQPConnectionContext connection) {
+ return configuration.isUseModifiedForTransientDeliveryErrors();
+ }
+
+ @Override
+ protected boolean isDrainOnTransientDeliveryErrors(AMQPConnectionContext
connection) {
+ return configuration.isDrainOnTransientDeliveryErrors();
+ }
+
+ @Override
+ protected int getLinkQuiesceTimeout(AMQPConnectionContext connection) {
+ return configuration.getLinkQuiesceTimeout();
+ }
+
@Override
protected Runnable createCreditRunnable(AMQPConnectionContext
connection) {
// We defer to the configuration instance as opposed to the base
class version that reads
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
index 85eab69100..2ecdb732fc 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
@@ -235,6 +235,21 @@ public class AMQPBridgeFromQueueReceiver extends
AMQPBridgeReceiver {
this.localQueue = localQueue;
}
+ @Override
+ protected boolean
isUseModifiedForTransientDeliveryErrors(AMQPConnectionContext connection) {
+ return configuration.isUseModifiedForTransientDeliveryErrors();
+ }
+
+ @Override
+ protected boolean isDrainOnTransientDeliveryErrors(AMQPConnectionContext
connection) {
+ return configuration.isDrainOnTransientDeliveryErrors();
+ }
+
+ @Override
+ protected int getLinkQuiesceTimeout(AMQPConnectionContext connection) {
+ return configuration.getLinkQuiesceTimeout();
+ }
+
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
index b7215f7c45..24e0d9ece5 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
@@ -21,7 +21,10 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.IGNORE_QUEUE_FILTERS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.QUEUE_RECEIVER_IDLE_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LARGE_MESSAGE_THRESHOLD;
@@ -210,4 +213,46 @@ public final class AMQPBridgeReceiverConfiguration extends
AMQPBridgeLinkConfigu
return configuration.isPreferSharedDurableSubscriptions();
}
}
+
+ /**
+ * (@return the use modified for transient delivery errors configuration}
+ */
+ public boolean isUseModifiedForTransientDeliveryErrors() {
+ final Object property =
properties.get(USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS);
+ if (property instanceof Boolean booleanValue) {
+ return booleanValue;
+ } else if (property instanceof String string) {
+ return Boolean.parseBoolean(string);
+ } else {
+ return configuration.isUseModifiedForTransientDeliveryErrors();
+ }
+ }
+
+ /**
+ * (@return the drain link credit on transient delivery errors
configuration}
+ */
+ public boolean isDrainOnTransientDeliveryErrors() {
+ final Object property =
properties.get(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS);
+ if (property instanceof Boolean booleanValue) {
+ return booleanValue;
+ } else if (property instanceof String string) {
+ return Boolean.parseBoolean(string);
+ } else {
+ return configuration.isDrainOnTransientDeliveryErrors();
+ }
+ }
+
+ /**
+ * {@return the federation receiver link quiesce timeout configuration}
+ */
+ public int getLinkQuiesceTimeout() {
+ final Object property = properties.get(RECEIVER_LINK_QUIESCE_TIMEOUT);
+ if (property instanceof Number number) {
+ return number.intValue();
+ } else if (property instanceof String string) {
+ return Integer.parseInt(string);
+ } else {
+ return configuration.getLinkQuiesceTimeout();
+ }
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
index 5d0bbdbbcc..4bdcf2e2a9 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
@@ -32,7 +32,10 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.MAX_LINK_RECOVERY_ATTEMPTS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PRESETTLE_SEND_MODE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT;
import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -71,6 +74,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
@@ -925,7 +930,7 @@ class AMQPBridgeFromAddressTest extends
AmqpClientTestSupport {
server.start();
server.deployDivert(divertConfig);
// Current implementation requires the source address exist on the
local broker before it
- // will attempt to federate it from the remote.
+ // will attempt to bridge it from the remote.
server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()),
RoutingType.MULTICAST));
// Demand on the forwarding address should create a remote consumer
for the forwarded address.
@@ -999,7 +1004,7 @@ class AMQPBridgeFromAddressTest extends
AmqpClientTestSupport {
server.start();
server.deployDivert(divertConfig);
// Current implementation requires the source address exist on the
local broker before it
- // will attempt to federate it from the remote.
+ // will attempt to bridge it from the remote.
server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()),
RoutingType.MULTICAST));
// Demand on the forwarding address should create a remote consumer
for the forwarded address.
@@ -1824,7 +1829,7 @@ class AMQPBridgeFromAddressTest extends
AmqpClientTestSupport {
@Test
@Timeout(20)
- public void
testAddressPolicyCanOverridesZeroCreditsInBridgeConfigurationAndFederateAddress()
throws Exception {
+ public void
testAddressPolicyCanOverridesZeroCreditsInBridgeConfigurationAndBridgeAddress()
throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
@@ -3879,6 +3884,184 @@ class AMQPBridgeFromAddressTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void testBridgeReceiverRejectsWithModifiedDeliveryFailedAsDefault()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBridgeAddressPolicyElement receiveFromAddress = new
AMQPBridgeAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(getTestName());
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromAddressPolicy(receiveFromAddress);
+ element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 10);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ final AddressSettings addressSettings =
server.getAddressSettingsRepository().getMatch(getTestName());
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings.setMaxSizeBytes(500);
+ server.getAddressSettingsRepository().addMatch(getTestName(),
addressSettings);
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final String payload = "A".repeat(2048);
+
+ peer.expectAttach().ofReceiver()
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+ .respond();
+ peer.expectFlow().withLinkCredit(1000);
+
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false)
+
.setFilterString("color='red'"));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectDisposition().withState().accepted(); // This should
fill the address
+ peer.expectFlow().withLinkCredit(998).withDrain(true); // Receiver
drains credit before sending the disposition
+ peer.expectDisposition().withState().modified(true); // Expect
modified / failed so remote doesn't drop the message
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
+ @Test
+ public void testDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel()
throws Exception {
+ doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(true);
+ }
+
+ @Test
+ public void testNoDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel()
throws Exception {
+ doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(false);
+ }
+
+ private void
doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(boolean
drainOnFull) throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBridgeAddressPolicyElement receiveFromAddress = new
AMQPBridgeAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(getTestName());
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromAddressPolicy(receiveFromAddress);
+ element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 10);
+ element.addProperty(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS,
String.valueOf(drainOnFull));
+ element.addProperty(RECEIVER_LINK_QUIESCE_TIMEOUT, 350);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ final AddressSettings addressSettings =
server.getAddressSettingsRepository().getMatch(getTestName());
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings.setMaxSizeBytes(1000);
+ server.getAddressSettingsRepository().addMatch(getTestName(),
addressSettings);
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final String payload = "A".repeat(2048);
+
+ peer.expectAttach().ofReceiver()
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+ .respond();
+ peer.expectFlow().withLinkCredit(1000);
+
+
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().accepted(); // This should fill
the address
+
+ if (drainOnFull) {
+ peer.expectFlow().withDrain(true).withLinkCredit(998);
+ peer.expectDisposition().withState().modified(true);
+ peer.expectDetach().withError(notNullValue()).respond();
+ } else {
+ peer.expectDisposition().withState().modified(true);
+ }
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(5);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
public static class ApplicationPropertiesTransformer implements Transformer
{
private final Map<String, String> properties = new HashMap<>();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java
index b2de17225f..9cb572a642 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java
@@ -32,11 +32,14 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW;
import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNEL_CORE_MESSAGES;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -69,6 +72,8 @@ import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
@@ -3765,6 +3770,220 @@ public class AMQPBridgeFromQueueTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void testBridgeReceiverRejectsWithModifiedDeliveryFailedAsDefault()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBridgeQueuePolicyElement receiveFromQueue = new
AMQPBridgeQueuePolicyElement();
+ receiveFromQueue.setName("queue-policy");
+ receiveFromQueue.addToIncludes(getTestName(), getTestName());
+ receiveFromQueue.addProperty(RECEIVER_QUIESCE_TIMEOUT, 10_000);
+ receiveFromQueue.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 250);
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromQueuePolicy(receiveFromQueue);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ final AddressSettings addressSettings =
server.getAddressSettingsRepository().getMatch(getTestName());
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings.setMaxSizeBytes(500);
+ server.getAddressSettingsRepository().addMatch(getTestName(),
addressSettings);
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final String payload = "A".repeat(2048);
+
+ peer.expectAttach().ofReceiver()
+ .withName(allOf(containsString(getTestName()),
+ containsString("queue-receiver"),
+
containsString(server.getNodeID().toString())))
+ .respond();
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory = CFUtil.createConnectionFactory(
+ "AMQP", "tcp://localhost:" + AMQP_PORT +
"?jms.prefetchPolicy.all=0");
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = session.createQueue(getTestName());
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectDisposition().withState().accepted(); // This
should fill the address
+ peer.expectFlow().withLinkCredit(998).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDisposition().withState().modified(true); // Expect
modified / failed so remote doesn't drop the message
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withApplicationProperties().also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withApplicationProperties().also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ // Address remains full so no new credit is issued and a clean
detach occurs next.
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectDetach().respond();
+
+ consumer.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
+ @Test
+ public void testDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel()
throws Exception {
+ doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(true);
+ }
+
+ @Test
+ public void testNoDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel()
throws Exception {
+ doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(false);
+ }
+
+ private void
doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(boolean
drainOnFull) throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBridgeQueuePolicyElement receiveFromQueue = new
AMQPBridgeQueuePolicyElement();
+ receiveFromQueue.setName("queue-policy");
+ receiveFromQueue.addToIncludes(getTestName(), getTestName());
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromQueuePolicy(receiveFromQueue);
+ element.addProperty(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS,
String.valueOf(drainOnFull));
+ element.addProperty(RECEIVER_LINK_QUIESCE_TIMEOUT, 350);
+ element.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ final AddressSettings addressSettings =
server.getAddressSettingsRepository().getMatch(getTestName());
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings.setMaxSizeBytes(500);
+ server.getAddressSettingsRepository().addMatch(getTestName(),
addressSettings);
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final String payload = "A".repeat(2048);
+
+ peer.expectAttach().ofReceiver()
+ .withName(allOf(containsString(getTestName()),
+ containsString("queue-receiver"),
+
containsString(server.getNodeID().toString())))
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory = CFUtil.createConnectionFactory(
+ "AMQP", "tcp://localhost:" + AMQP_PORT +
"?jms.prefetchPolicy.all=0");
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = session.createQueue(getTestName());
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().accepted(); // This should
fill the address
+
+ if (drainOnFull) {
+ peer.expectFlow().withDrain(true).withLinkCredit(998);
+ peer.expectDisposition().withState().modified(true);
+ peer.expectDetach().withError(notNullValue()).respond();
+ } else {
+ peer.expectDisposition().withState().modified(true);
+ }
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(5);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ if (!drainOnFull) {
+ peer.expectFlow().withDrain(true).withLinkCredit(998)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach().respond();
+ }
+
+ consumer.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
public static class ApplicationPropertiesTransformer implements Transformer
{
private final Map<String, String> properties = new HashMap<>();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index 5b217fd31d..bdaf6cd706 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -90,7 +90,6 @@ import org.slf4j.LoggerFactory;
import static
org.apache.activemq.artemis.core.config.WildcardConfiguration.DEFAULT_WILDCARD_CONFIGURATION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConfiguration.DEFAULT_PULL_CREDIT_BATCH_SIZE;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.EVENT_TYPE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONFIGURATION;
@@ -5458,7 +5457,6 @@ public class AMQPFederationQueuePolicyTest extends
AmqpClientTestSupport {
final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalQueuePolicy(receiveFromQueue);
- element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
element.addProperty(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS,
String.valueOf(drainOnFull));
element.addProperty(RECEIVER_LINK_QUIESCE_TIMEOUT, 350);
element.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact