This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a2bb690e46 ARTEMIS-5953 Allow address federation bindings to group
under a wildcard
a2bb690e46 is described below
commit a2bb690e468d7956a4c83f10efa65f09a6d66f45
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Mar 13 16:15:56 2026 -0400
ARTEMIS-5953 Allow address federation bindings to group under a wildcard
Add configuration that enables the address federation policy matches to
define if
a remote wildcard address will be used to host the address receiver
bindings for
the matching addresses if the match value is itself a wildcard. This can be
useful
if federating large numbers of addresses that have low traffic as it
reduces the
number of addresses that need to exist on the remote federation target at
the
expense of higher CPU load on the target in order to route messages to the
appropriate wildcard bindings for the matched addresses. The default value
will
be `false` and the federation receiver address bindings will be attached to
the
matching addresses on the remote as they always have. When enabled the
remote
broker must have wildcard routing enabled for this feature to work
correctly.
---
.../AMQPFederationAddressBindingsConsumer.java | 2 +-
.../federation/AMQPFederationAddressConsumer.java | 12 +-
.../AMQPFederationAddressPolicyManager.java | 183 +++++---
.../federation/AMQPFederationConstants.java | 8 +
.../AMQPFederationConsumerControlType.java | 4 +-
.../AMQPFederationGenericConsumerInfo.java | 69 ++-
.../AMQPFederationManagementSupport.java | 32 +-
.../federation/AMQPFederationPolicySupport.java | 47 +-
.../federation/AMQPFederationQueueConsumer.java | 9 +-
.../AMQPFederationQueuePolicyManager.java | 3 +-
.../amqp/federation/FederationConsumerInfo.java | 29 +-
.../FederationReceiveFromAddressPolicy.java | 61 ++-
.../AMQPFederationPolicySupportTest.java | 57 ++-
.../AMQPFederationAddressPolicyElement.java | 15 +-
.../deployers/impl/FileConfigurationParser.java | 2 +
.../resources/schema/artemis-configuration.xsd | 1 +
.../AMQPFederationAddressPolicyElementTest.java | 8 +
.../core/config/impl/ConfigurationImplTest.java | 3 +
.../resources/ConfigurationTest-full-config.xml | 4 +-
.../amqp-federation-configuration-glossary.adoc | 6 +
.../connect/AMQPFederationAddressPolicyTest.java | 511 ++++++++++++++++++++-
.../amqp/connect/AMQPFederationConnectTest.java | 3 +
.../amqp/connect/AMQPFederationManagementTest.java | 3 +
.../connect/AMQPFederationServerToServerTest.java | 290 ++++++++++++
24 files changed, 1175 insertions(+), 187 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressBindingsConsumer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressBindingsConsumer.java
index 8499eaabf0..7abe3021c6 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressBindingsConsumer.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressBindingsConsumer.java
@@ -146,7 +146,7 @@ public final class AMQPFederationAddressBindingsConsumer
extends AMQPFederationA
delivery.setContext(message);
- message.setAddress(consumerInfo.getAddress());
+ message.setAddress(consumerInfo.getTargetAddress());
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
if (message.getMessageID() <= 0) {
message.setMessageID(storageManager.generateID());
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
index 483e9729cd..28602201be 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
@@ -96,7 +96,7 @@ public abstract class AMQPFederationAddressConsumer extends
AMQPFederationConsum
if (federation.getCapabilities().isUseFQQNAddressSubscriptions()) {
return "federation-" + federation.getName() +
"-policy-" + policy.getPolicyName() +
- "-address-receiver-" + consumerInfo.getAddress() +
+ "-address-receiver-" + consumerInfo.getTargetAddress() +
"-" + federation.getServer().getNodeID() +
"-" + LINK_SEQUENCE_ID.incrementAndGet();
} else {
@@ -107,7 +107,7 @@ public abstract class AMQPFederationAddressConsumer extends
AMQPFederationConsum
// connection recovery which is arguably less broken than the
sequence ID variant which
// creates unstable subscription queues that can be orphaned or
consumed out of order.
return "federation-" + federation.getName() +
- "-address-receiver-" + consumerInfo.getAddress() +
+ "-address-receiver-" + consumerInfo.getTargetAddress() +
"-" + federation.getServer().getNodeID();
}
}
@@ -146,12 +146,12 @@ public abstract class AMQPFederationAddressConsumer
extends AMQPFederationConsum
source.setFilter(filtersMap);
if (federation.getCapabilities().isUseFQQNAddressSubscriptions()) {
- source.setAddress(consumerInfo.getFqqn());
+ source.setAddress(consumerInfo.getSourceFqqn());
} else {
- source.setAddress(consumerInfo.getAddress()); // Legacy behavior
+ source.setAddress(consumerInfo.getTargetAddress()); // Legacy
behavior where both sides would be the target value
}
- target.setAddress(consumerInfo.getAddress());
+ target.setAddress(consumerInfo.getTargetAddress());
final Map<String, Object> addressSourceProperties = new HashMap<>();
// If the remote needs to create the address then it should apply
these
@@ -303,7 +303,7 @@ public abstract class AMQPFederationAddressConsumer extends
AMQPFederationConsum
AMQPFederatedAddressDeliveryHandler(AMQPSessionContext session,
FederationConsumerInfo consumerInfo, Receiver receiver) {
super(session.getSessionSPI(), session.getAMQPConnectionContext(),
session, receiver);
- this.cachedAddress = SimpleString.of(consumerInfo.getAddress());
+ this.cachedAddress = SimpleString.of(consumerInfo.getTargetAddress());
}
@Override
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
index 6703715492..88789c20b6 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
@@ -48,7 +48,6 @@ import
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerIn
import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
-import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -254,21 +253,10 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
return;
}
- // Target brokers which have been sent remote federation policies
might not have write
- // access via the logged in user to the address with demand which
we are attempting to
- // federate messages to so instead of creating a receiver that
will fail when the remote
- // routes a message to it we can just omit creating the link in
the first place.
- if (federation.isFederationTarget()) {
- try {
- session.getSessionSPI().check(addressInfo.getName(),
CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
- } catch (ActiveMQSecurityException e) {
-
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
- addressInfo.getName().toString(), "User does not have
send permission to configured address.");
- return;
- } catch (Exception ex) {
- logger.warn("Caught unknown exception from security check on
address:{} send permissions: cannot federate:", addressInfo.getName(), ex);
- return;
- }
+ // If unable to write to the given address then we can stop here
and not create a remote
+ // receiver that will fail once a message is routed.
+ if (isAddressSecurityBlockingWrites(addressInfo.getName())) {
+ return;
}
createOrUpdateFederatedAddressConsumerForBinding(addressInfo,
queueBinding);
@@ -298,6 +286,27 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
return false;
}
+ private boolean isAddressSecurityBlockingWrites(SimpleString address) {
+ // Target brokers which have been sent remote federation policies might
not have write
+ // access via the logged in user to the address with demand which we are
attempting to
+ // federate messages to so instead of creating a receiver that will fail
when the remote
+ // routes a message to it we can just omit creating the link in the
first place.
+ if (federation.isFederationTarget()) {
+ try {
+ session.getSessionSPI().check(address, CheckType.SEND,
federation.getConnectionContext().getSecurityAuth());
+ } catch (ActiveMQSecurityException e) {
+
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
+ address.toString(), "User does not have send permission to
configured address.");
+ return true;
+ } catch (Exception ex) {
+ logger.warn("Caught unknown exception from security check on
address:{} send permissions: cannot federate:", address, ex);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private void reactIfAnyQueueBindingMatchesDivertTarget(DivertBinding
divertBinding) {
if (!policy.isEnableDivertBindings()) {
return;
@@ -309,21 +318,9 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
return;
}
- // Target brokers which have been sent remote federation policies might
not have write access
- // via the logged in user to the address this divert is attached to
which means we don't need
- // to track this divert. Since we don't add the divert to the tracking
map future demand on
- // divert that would otherwise match the address includes won't trigger
federation attempts.
- if (federation.isFederationTarget()) {
- try {
- session.getSessionSPI().check(addressInfo.getName(),
CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
- } catch (ActiveMQSecurityException e) {
-
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
- addressInfo.getName().toString(), "User does not have send
permission to configured address.");
- return;
- } catch (Exception ex) {
- logger.warn("Caught unknown exception from security check on
address:{} send permissions: cannot federate:", addressInfo.getName(), ex);
- return;
- }
+ // Don't track the divert if we cannot write to the address with the
divert
+ if (isAddressSecurityBlockingWrites(addressInfo.getName())) {
+ return;
}
// We only need to check if we've never seen the divert before,
afterwards we will
@@ -486,6 +483,26 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
return policy.test(address, type);
}
+ /**
+ * Called when demand for an address has been determined to trigger need
for a remote receiver to
+ * federate messages back to this peer and wildcard subscriptions has been
enabled. The value returned
+ * is the first configured match in the policy. If there are multiple
matchers in the policy that
+ * could match on this address only the first one located should be
returned as having more than one
+ * positive matcher is a configuration issue the user is on the hook if
this result in unexpected
+ * behaviors from the federation manager. The result could be an exact
match to the address name or it
+ * could be a wildcard pattern match that captures this address in its
scope.
+ *
+ * @param address
+ * The address that has triggered demand and requires a new remote
consumer.
+ * @param type
+ * The routing type that the address was created with.
+ *
+ * @return the first matcher string from the configured address policy that
matches on the target address
+ */
+ private String getMatcherForAddress(String address, RoutingType type) {
+ return policy.getFirstMatchingAddressPattern(address, type);
+ }
+
private static boolean isAddressInDivertForwards(final SimpleString
targetAddress, final SimpleString forwardAddress) {
final SimpleString[] forwardAddresses = forwardAddress.split(',');
@@ -572,10 +589,13 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
AMQPFederationAddressConsumerManager<?> consumerManager =
registry.get(key);
if (consumerManager == null) {
- if (isUseConduitConsumer()) {
- registry.put(key, consumerManager = new
AMQPFederationAddressConduitConsumerManager(manager,
createConsumerInfo(addressInfo, binding), addressInfo));
+ final boolean useConduitConsumer = isUseConduitConsumer();
+ final FederationConsumerInfo consumerInfo =
createConsumerInfo(binding, useConduitConsumer);
+
+ if (useConduitConsumer) {
+ registry.put(key, consumerManager = new
AMQPFederationAddressConduitConsumerManager(manager, consumerInfo,
addressInfo));
} else {
- registry.put(key, consumerManager = new
AMQPFederationAddressBindingsConsumerManager(manager,
createConsumerInfo(addressInfo, binding), addressInfo));
+ registry.put(key, consumerManager = new
AMQPFederationAddressBindingsConsumerManager(manager, consumerInfo,
addressInfo));
}
}
@@ -619,37 +639,67 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
}
/**
- * Create a new {@link FederationConsumerInfo} based on the given {@link
AddressInfo} and the configured
- * {@link FederationReceiveFromAddressPolicy}.
+ * Create a new {@link FederationConsumerInfo} based on the given data
from this address consumer
+ * registry and the binding that generated the demand that triggered
creation of a consumer.
*
- * @param address
- * The {@link AddressInfo} to use as a basis for the consumer
information object.
* @param binding
* The {@link Binding} that is the source of demand for this
consumer info object
+ * @param conduitConsumer
+ * Is the consumer being created a conduit consumer or a direct to
bindings consumer.
*
* @return a new {@link FederationConsumerInfo} instance based on the
given address
*/
- private FederationConsumerInfo createConsumerInfo(AddressInfo address,
Binding binding) {
- final String addressName = address.getName().toString();
- final boolean ignoreBindingFilters = isUseConduitConsumer() ||
binding.getFilter() == null;
- final String generatedQueueName = generateQueueName(address, binding,
ignoreBindingFilters);
- final String consumerFilter;
-
- if (ignoreBindingFilters) {
- consumerFilter = baseConsumerFilter;
- } else if (baseConsumerFilter != null) {
- consumerFilter = "(" + binding.getFilter().getFilterString() + ")
AND " + baseConsumerFilter;
+ private FederationConsumerInfo createConsumerInfo(Binding binding,
boolean conduitConsumer) {
+ final boolean allowWildcardGroupings = isAllowWildcardGroupings();
+ final String targetAddress = addressInfo.getName().toString();
+ final String sourceAddress;
+ final String sourceFilter;
+
+ if (allowWildcardGroupings) {
+ final String matcher =
manager.getMatcherForAddress(addressInfo.getName().toString(),
addressInfo.getRoutingType());
+
+ if (manager.getWildcardConfiguration().isWild(matcher)) {
+ sourceAddress = matcher;
+ sourceFilter = "AMQAddress='" + targetAddress + "'";
+ } else {
+ sourceAddress = targetAddress;
+ sourceFilter = null;
+ }
} else {
- consumerFilter = binding.getFilter().getFilterString().toString();
+ sourceAddress = targetAddress;
+ sourceFilter = null;
}
+ final boolean ignoreBindingFilters = conduitConsumer ||
binding.getFilter() == null;
+ final CharSequence bindingFilter = ignoreBindingFilters ? null :
binding.getFilter().getFilterString();
+ final String generatedQueueName = generateQueueName(binding,
ignoreBindingFilters);
+ final StringBuilder consumerFilter = new StringBuilder();
+
+ appendFilter(consumerFilter, sourceFilter, false); // Per
address filter for wildcard groups
+ appendFilter(consumerFilter, baseConsumerFilter, false); // Base
federation filter for maxHops etc
+ appendFilter(consumerFilter, bindingFilter, true); // Bindings
filter if supporting per address filters
+
return new
AMQPFederationGenericConsumerInfo(FederationConsumerInfo.Role.ADDRESS_CONSUMER,
- addressName,
- generatedQueueName,
- address.getRoutingType(),
- consumerFilter,
- CompositeAddress.toFullyQualified(addressName, generatedQueueName),
- ActiveMQDefaultConfiguration.getDefaultConsumerPriority());
+ sourceAddress,
+ targetAddress,
+ generatedQueueName,
+
addressInfo.getRoutingType(),
+ consumerFilter.isEmpty()
? null : consumerFilter.toString(),
+
ActiveMQDefaultConfiguration.getDefaultConsumerPriority());
+ }
+
+ private static void appendFilter(StringBuilder builder, CharSequence
filter, boolean parenthesesIfNeeded) {
+ if (filter != null) {
+ if (!builder.isEmpty()) {
+ builder.append(" AND ");
+ }
+
+ if (parenthesesIfNeeded && !builder.isEmpty()) {
+ builder.append("(").append(filter).append(")");
+ } else {
+ builder.append(filter);
+ }
+ }
}
private boolean isUseConduitConsumer() {
@@ -671,16 +721,33 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
}
}
- private String generateQueueName(AddressInfo address, Binding binding,
boolean ignoreFilters) {
+ /*
+ * Should we allow the existing remote receivers to be bound to a
wildcard address instead of an address
+ * that matches the target? We cannot if no FQQN but otherwise we can so
long as we create a filter to
+ * account for the messages that will arrive at the wildcard root
address that don't match the local target
+ * via a filter for the AMQAddress value matching the receiver target
address..
+ */
+ private boolean isAllowWildcardGroupings() {
+ if (!manager.getCapabilities().isUseFQQNAddressSubscriptions()) {
+ // prior to FQQN subscription support we used a simple link name
that would not be unique amongst
+ // multiple consumers on the same address so for most features or
behaviors that come later we cannot
+ // action them properly and we must fallback to a conduit consumer
strategy.
+ return false;
+ } else {
+ return manager.getPolicy().isAllowWildcardGroupings();
+ }
+ }
+
+ private String generateQueueName(Binding binding, boolean ignoreFilters)
{
if (ignoreFilters) {
return "federation." + manager.getFederation().getName() +
".policy." + manager.getPolicyName() +
- ".address." + address.getName() +
+ ".address." + addressInfo.getName() +
".node." + manager.server.getNodeID();
} else {
return "federation." + manager.getFederation().getName() +
".policy." + manager.getPolicyName() +
- ".address." + address.getName() +
+ ".address." + addressInfo.getName() +
".filterId." +
generateFilterId(binding.getFilter().getFilterString().toString()) +
".node." + manager.server.getNodeID();
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
index 38d0745517..e5cc4f9c10 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
@@ -298,6 +298,14 @@ public final class AMQPFederationConstants {
*/
public static final String ADDRESS_ENABLE_DIVERT_BINDINGS =
"enable-divert-bindings";
+ /**
+ * Encodes a boolean value that controls if the address federation should
check the matcher value in the
+ * policy and if its a wildcard, create a consumer that indicates its
source is the wildcard address of
+ * the matcher and place the normal address binding under that wildcard but
add a filter to prevent any
+ * messages other than those of its target address from being routed to it.
+ */
+ public static final String ADDRESS_ALLOW_WILDCARD_GROUPINGS =
"allow-wildcard-groupings";
+
/**
* Encodes a {@link Map} of String keys and values that are carried along
in the federation policy (address or
* queue). These values can be used to add extended configuration to the
policy object such as overriding settings
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerControlType.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerControlType.java
index e69a311afd..45d4393828 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerControlType.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerControlType.java
@@ -86,7 +86,7 @@ public class AMQPFederationConsumerControlType extends
AbstractControl implement
}
clearIO();
try {
- return consumerInfo.getAddress();
+ return consumerInfo.getTargetAddress();
} finally {
blockOnIO();
}
@@ -99,7 +99,7 @@ public class AMQPFederationConsumerControlType extends
AbstractControl implement
}
clearIO();
try {
- return consumerInfo.getFqqn();
+ return consumerInfo.getTargetFqqn();
} finally {
blockOnIO();
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationGenericConsumerInfo.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationGenericConsumerInfo.java
index f9627e1d30..bee2789bb9 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationGenericConsumerInfo.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationGenericConsumerInfo.java
@@ -20,11 +20,8 @@ package
org.apache.activemq.artemis.protocol.amqp.connect.federation;
import java.util.Objects;
import java.util.UUID;
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
-import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.utils.CompositeAddress;
/**
@@ -37,48 +34,30 @@ public class AMQPFederationGenericConsumerInfo implements
FederationConsumerInfo
public static final String QUEUE_NAME_FORMAT_STRING =
"${address}::${routeType}";
private final Role role;
- private final String address;
+ private final String sourceAddress;
+ private final String sourceFqqn;
+ private final String targetAddress;
+ private final String targetFqqn;
private final String queueName;
private final RoutingType routingType;
private final String filterString;
- private final String fqqn;
private final int priority;
private final String id;
- public AMQPFederationGenericConsumerInfo(Role role, String address, String
queueName, RoutingType routingType,
- String filterString, String fqqn, int
priority) {
+ public AMQPFederationGenericConsumerInfo(Role role, String sourceAddress,
String targetAddress, String queueName,
+ RoutingType routingType, String
filterString, int priority) {
this.role = role;
- this.address = address;
+ this.sourceAddress = sourceAddress;
+ this.targetAddress = targetAddress;
this.queueName = queueName;
this.routingType = routingType;
this.filterString = filterString;
- this.fqqn = fqqn;
+ this.sourceFqqn = CompositeAddress.toFullyQualified(sourceAddress,
queueName);
+ this.targetFqqn = CompositeAddress.toFullyQualified(targetAddress,
queueName);
this.priority = priority;
this.id = UUID.randomUUID().toString();
}
- /**
- * Factory for creating federation address consumer information objects
from server resources.
- *
- * @param address The address being federated, the remote consumer
will be created under this address.
- * @param queueName The name of the remote queue that will be created in
order to route messages here.
- * @param routingType The routing type to assign the remote consumer.
- * @param filterString A filter string used by the federation instance to
limit what enters the remote queue.
- * @param federation The parent {@link Federation} that this federation
consumer is created for
- * @param policy The {@link FederationReceiveFromAddressPolicy} that
triggered this information object to be
- * created.
- * @return a newly created and configured {@link FederationConsumerInfo}
instance
- */
- public static AMQPFederationGenericConsumerInfo build(String address,
String queueName, RoutingType routingType, String filterString, Federation
federation, FederationReceiveFromAddressPolicy policy) {
- return new AMQPFederationGenericConsumerInfo(Role.ADDRESS_CONSUMER,
- address,
- queueName,
- routingType,
- filterString,
-
CompositeAddress.toFullyQualified(address, queueName),
-
ActiveMQDefaultConfiguration.getDefaultConsumerPriority());
- }
-
@Override
public String getId() {
return id;
@@ -95,13 +74,23 @@ public class AMQPFederationGenericConsumerInfo implements
FederationConsumerInfo
}
@Override
- public String getAddress() {
- return address;
+ public String getSourceAddress() {
+ return sourceAddress;
+ }
+
+ @Override
+ public String getTargetAddress() {
+ return targetAddress;
+ }
+
+ @Override
+ public String getSourceFqqn() {
+ return sourceFqqn;
}
@Override
- public String getFqqn() {
- return fqqn;
+ public String getTargetFqqn() {
+ return targetFqqn;
}
@Override
@@ -130,20 +119,22 @@ public class AMQPFederationGenericConsumerInfo implements
FederationConsumerInfo
return role == other.role &&
priority == other.priority &&
- Objects.equals(address, other.address) &&
+ Objects.equals(sourceAddress, other.sourceAddress) &&
+ Objects.equals(targetAddress, other.targetAddress) &&
Objects.equals(queueName, other.queueName) &&
routingType == other.routingType &&
Objects.equals(filterString, other.filterString) &&
- Objects.equals(fqqn, other.fqqn);
+ Objects.equals(sourceFqqn, other.sourceFqqn) &&
+ Objects.equals(targetFqqn, other.targetFqqn);
}
@Override
public int hashCode() {
- return Objects.hash(role, address, queueName, routingType, filterString,
fqqn, priority);
+ return Objects.hash(role, sourceAddress, targetAddress, queueName,
routingType, filterString, sourceFqqn, targetFqqn, priority);
}
@Override
public String toString() {
- return "FederationConsumerInfo: { " + getRole() + ", " + getFqqn() + "}";
+ return "FederationConsumerInfo: { " + getRole() + ", " + getSourceFqqn()
+ "}";
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java
index c42f9715c0..fd54fa9502 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java
@@ -413,13 +413,13 @@ public abstract class AMQPFederationManagementSupport {
final String policyName = manager.getPolicyName();
if (consumer.getRole() == FederationConsumerInfo.Role.ADDRESS_CONSUMER) {
-
management.registerUntypedControl(getFederationSourceAddressConsumerResourceName(brokerConnectionName,
federationName, policyName, consumer.getConsumerInfo().getAddress()),
+
management.registerUntypedControl(getFederationSourceAddressConsumerResourceName(brokerConnectionName,
federationName, policyName, consumer.getConsumerInfo().getTargetAddress()),
control,
-
getFederationSourceAddressConsumerObjectName(management, brokerConnectionName,
federationName, manager.getPolicyType().toString(), policyName,
consumer.getConsumerInfo().getAddress()));
+
getFederationSourceAddressConsumerObjectName(management, brokerConnectionName,
federationName, manager.getPolicyType().toString(), policyName,
consumer.getConsumerInfo().getTargetAddress()));
} else {
-
management.registerUntypedControl(getFederationSourceQueueConsumerResourceName(brokerConnectionName,
federationName, policyName, consumer.getConsumerInfo().getFqqn()),
+
management.registerUntypedControl(getFederationSourceQueueConsumerResourceName(brokerConnectionName,
federationName, policyName, consumer.getConsumerInfo().getTargetFqqn()),
control,
-
getFederationSourceQueueConsumerObjectName(management, brokerConnectionName,
federationName, manager.getPolicyType().toString(), policyName,
consumer.getConsumerInfo().getFqqn()));
+
getFederationSourceQueueConsumerObjectName(management, brokerConnectionName,
federationName, manager.getPolicyType().toString(), policyName,
consumer.getConsumerInfo().getTargetFqqn()));
}
}
@@ -439,11 +439,11 @@ public abstract class AMQPFederationManagementSupport {
final String policyName = manager.getPolicyName();
if (consumer.getRole() == FederationConsumerInfo.Role.ADDRESS_CONSUMER) {
-
management.unregisterUntypedControl(getFederationSourceAddressConsumerResourceName(brokerConnectionName,
federationName, policyName, consumer.getConsumerInfo().getAddress()),
-
getFederationSourceAddressConsumerObjectName(management, brokerConnectionName,
federationName, manager.getPolicyType().toString(), policyName,
consumer.getConsumerInfo().getAddress()));
+
management.unregisterUntypedControl(getFederationSourceAddressConsumerResourceName(brokerConnectionName,
federationName, policyName, consumer.getConsumerInfo().getTargetAddress()),
+
getFederationSourceAddressConsumerObjectName(management, brokerConnectionName,
federationName, manager.getPolicyType().toString(), policyName,
consumer.getConsumerInfo().getTargetAddress()));
} else {
-
management.unregisterUntypedControl(getFederationSourceQueueConsumerResourceName(brokerConnectionName,
federationName, policyName, consumer.getConsumerInfo().getFqqn()),
-
getFederationSourceQueueConsumerObjectName(management, brokerConnectionName,
federationName, manager.getPolicyType().toString(), policyName,
consumer.getConsumerInfo().getFqqn()));
+
management.unregisterUntypedControl(getFederationSourceQueueConsumerResourceName(brokerConnectionName,
federationName, policyName, consumer.getConsumerInfo().getTargetFqqn()),
+
getFederationSourceQueueConsumerObjectName(management, brokerConnectionName,
federationName, manager.getPolicyType().toString(), policyName,
consumer.getConsumerInfo().getTargetFqqn()));
}
}
@@ -575,13 +575,13 @@ public abstract class AMQPFederationManagementSupport {
final String policyName = manager.getPolicyName();
if (consumer.getRole() == FederationConsumerInfo.Role.ADDRESS_CONSUMER) {
-
management.registerUntypedControl(getFederationTargetAddressConsumerResourceName(remoteNodeId,
brokerConnectionName, federationName, policyName,
consumer.getConsumerInfo().getAddress()),
+
management.registerUntypedControl(getFederationTargetAddressConsumerResourceName(remoteNodeId,
brokerConnectionName, federationName, policyName,
consumer.getConsumerInfo().getTargetAddress()),
control,
-
getFederationTargetAddressConsumerObjectName(management, remoteNodeId,
brokerConnectionName, federationName, manager.getPolicyType().toString(),
policyName, consumer.getConsumerInfo().getAddress()));
+
getFederationTargetAddressConsumerObjectName(management, remoteNodeId,
brokerConnectionName, federationName, manager.getPolicyType().toString(),
policyName, consumer.getConsumerInfo().getTargetAddress()));
} else {
-
management.registerUntypedControl(getFederationTargetQueueConsumerResourceName(remoteNodeId,
brokerConnectionName, federationName, policyName,
consumer.getConsumerInfo().getFqqn()),
+
management.registerUntypedControl(getFederationTargetQueueConsumerResourceName(remoteNodeId,
brokerConnectionName, federationName, policyName,
consumer.getConsumerInfo().getTargetFqqn()),
control,
-
getFederationTargetQueueConsumerObjectName(management, remoteNodeId,
brokerConnectionName, federationName, manager.getPolicyType().toString(),
policyName, consumer.getConsumerInfo().getFqqn()));
+
getFederationTargetQueueConsumerObjectName(management, remoteNodeId,
brokerConnectionName, federationName, manager.getPolicyType().toString(),
policyName, consumer.getConsumerInfo().getTargetFqqn()));
}
}
@@ -601,11 +601,11 @@ public abstract class AMQPFederationManagementSupport {
final String policyName = manager.getPolicyName();
if (consumer.getRole() == FederationConsumerInfo.Role.ADDRESS_CONSUMER) {
-
management.unregisterUntypedControl(getFederationTargetAddressConsumerResourceName(remoteNodeId,
brokerConnectionName, federationName, policyName,
consumer.getConsumerInfo().getAddress()),
-
getFederationTargetAddressConsumerObjectName(management, remoteNodeId,
brokerConnectionName, federationName, manager.getPolicyType().toString(),
policyName, consumer.getConsumerInfo().getAddress()));
+
management.unregisterUntypedControl(getFederationTargetAddressConsumerResourceName(remoteNodeId,
brokerConnectionName, federationName, policyName,
consumer.getConsumerInfo().getTargetAddress()),
+
getFederationTargetAddressConsumerObjectName(management, remoteNodeId,
brokerConnectionName, federationName, manager.getPolicyType().toString(),
policyName, consumer.getConsumerInfo().getTargetAddress()));
} else {
-
management.unregisterUntypedControl(getFederationTargetQueueConsumerResourceName(remoteNodeId,
brokerConnectionName, federationName, policyName,
consumer.getConsumerInfo().getFqqn()),
-
getFederationTargetQueueConsumerObjectName(management, remoteNodeId,
brokerConnectionName, federationName, manager.getPolicyType().toString(),
policyName, consumer.getConsumerInfo().getFqqn()));
+
management.unregisterUntypedControl(getFederationTargetQueueConsumerResourceName(remoteNodeId,
brokerConnectionName, federationName, policyName,
consumer.getConsumerInfo().getTargetFqqn()),
+
getFederationTargetQueueConsumerObjectName(management, remoteNodeId,
brokerConnectionName, federationName, manager.getPolicyType().toString(),
policyName, consumer.getConsumerInfo().getTargetFqqn()));
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
index 1bf0494ea5..7212cfdd9a 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
@@ -21,6 +21,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
@@ -105,6 +106,7 @@ public final class AMQPFederationPolicySupport {
* is created as these values are used to indicate no max hops for
federated messages on an address.
*
* @param maxHops The max allowed number of hops before a message should
stop crossing federation links.
+ *
* @return the address filter string that should be applied (or null)
*/
public static String generateAddressFilter(int maxHops) {
@@ -125,6 +127,7 @@ public final class AMQPFederationPolicySupport {
* {@link FederationReceiveFromQueuePolicy}.
*
* @param policy The policy to encode into an AMQP message.
+ *
* @return an AMQP Message with the encoded policy
*/
public static AMQPMessage
encodeQueuePolicyControlMessage(FederationReceiveFromQueuePolicy policy) {
@@ -194,6 +197,7 @@ public final class AMQPFederationPolicySupport {
* {@link FederationReceiveFromAddressPolicy}.
*
* @param policy The policy to encode into an AMQP message.
+ *
* @return an AMQP Message with the encoded policy
*/
public static AMQPMessage
encodeAddressPolicyControlMessage(FederationReceiveFromAddressPolicy policy) {
@@ -211,6 +215,7 @@ public final class AMQPFederationPolicySupport {
policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT,
policy.getAutoDeleteMessageCount());
policyMap.put(ADDRESS_MAX_HOPS, policy.getMaxHops());
policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS,
policy.isEnableDivertBindings());
+ policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS,
policy.isAllowWildcardGroupings());
if (!policy.getIncludes().isEmpty()) {
policyMap.put(ADDRESS_INCLUDES, new
ArrayList<>(policy.getIncludes()));
}
@@ -250,11 +255,16 @@ public final class AMQPFederationPolicySupport {
/**
* Given an AMQP Message decode an {@link FederationReceiveFromQueuePolicy}
from it and return the decoded value. The
* message should have already been inspected and determined to be an
control message of the add to policy type.
+ * The decoder ignores elements in the message that it does not know about
so that future configuration additions do
+ * not break previous broker instances.
+ *
+ * @param message
+ * The {@link AMQPMessage} that should carry an encoded {@link
FederationReceiveFromQueuePolicy}
+ * @param wildcardConfig
+ * The {@link WildcardConfiguration} to use in the decoded policy.
*
- * @param message The {@link AMQPMessage} that should carry an
encoded
- * {@link FederationReceiveFromQueuePolicy}
- * @param wildcardConfig The {@link WildcardConfiguration} to use in the
decoded policy.
* @return a decoded {@link FederationReceiveFromQueuePolicy} instance
+ *
* @throws ActiveMQException if an error occurs while decoding the policy.
*/
@SuppressWarnings("unchecked")
@@ -359,11 +369,16 @@ public final class AMQPFederationPolicySupport {
/**
* Given an AMQP Message decode an {@link
FederationReceiveFromAddressPolicy} from it and return the decoded value.
* The message should have already been inspected and determined to be an
control message of the add to policy type.
+ * The decoder ignores elements in the message that it does not know about
so that future configuration additions do
+ * not break previous broker instances.
+ *
+ * @param message
+ * The {@link AMQPMessage} that should carry an encoded {@link
FederationReceiveFromQueuePolicy}
+ * @param wildcardConfig
+ * The {@link WildcardConfiguration} to use in the decoded policy.
*
- * @param message The {@link AMQPMessage} that should carry an
encoded
- * {@link FederationReceiveFromQueuePolicy}
- * @param wildcardConfig The {@link WildcardConfiguration} to use in the
decoded policy.
* @return a decoded {@link FederationReceiveFromAddressPolicy} instance
+ *
* @throws ActiveMQException if an error occurs during the policy decode.
*/
@SuppressWarnings("unchecked")
@@ -399,6 +414,7 @@ public final class AMQPFederationPolicySupport {
final long autoDeleteMsgCount = ((Number)
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT, 0L)).longValue();
final int maxHops = ((Number)
policyMap.get(ADDRESS_MAX_HOPS)).intValue();
final boolean enableDiverts = (Boolean)
policyMap.getOrDefault(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+ final boolean allowWildcardGroupings = (Boolean)
policyMap.getOrDefault(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
final Set<String> includes;
final Set<String> excludes;
@@ -435,8 +451,8 @@ public final class AMQPFederationPolicySupport {
return new FederationReceiveFromAddressPolicy(policyName, autoDelete,
autoDeleteDelay,
autoDeleteMsgCount,
maxHops, enableDiverts,
- includes, excludes,
properties, transformerConfig,
- wildcardConfig);
+ allowWildcardGroupings,
includes, excludes,
+ properties,
transformerConfig, wildcardConfig);
} catch (Exception e) {
throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
"Invalid encoded address policy entry: " + e.getMessage());
@@ -447,8 +463,11 @@ public final class AMQPFederationPolicySupport {
* From the broker AMQP broker connection configuration element and the
configured wild-card settings create an
* address match policy.
*
- * @param element The broker connections element configuration that
creates this policy.
- * @param wildcards The configured wild-card settings for the broker or
defaults.
+ * @param element
+ * The broker connections configuration that creates this federation
address policy.
+ * @param wildcards
+ * The configured wild-card settings for the broker or defaults.
+ *
* @return a new address match and handling policy for use in the broker
connection
*/
public static FederationReceiveFromAddressPolicy
create(AMQPFederationAddressPolicyElement element, WildcardConfiguration
wildcards) {
@@ -481,6 +500,7 @@ public final class AMQPFederationPolicySupport {
Objects.requireNonNullElse(element.getAutoDeleteMessageCount(), 0L),
element.getMaxHops(),
Objects.requireNonNullElse(element.isEnableDivertBindings(), false),
+ Objects.requireNonNullElse(element.isAllowWildcardGroupings(), false),
includes,
excludes,
element.getProperties(),
@@ -496,8 +516,11 @@ public final class AMQPFederationPolicySupport {
* in order to attempt to prevent federation consumers from consuming
messages on the remote when a local consumer is
* present.
*
- * @param element The broker connections element configuration that
creates this policy.
- * @param wildcards The configured wild-card settings for the broker or
defaults.
+ * @param element
+ * The broker connections configuration that creates this federation
queue policy.
+ * @param wildcards
+ * The configured wild-card settings for the broker or defaults.
+ *
* @return a new queue match and handling policy for use in the broker
connection
*/
public static FederationReceiveFromQueuePolicy
create(AMQPFederationQueuePolicyElement element, WildcardConfiguration
wildcards) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
index b80b0d218e..3f796de9c9 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
@@ -87,7 +87,7 @@ public final class AMQPFederationQueueConsumer extends
AMQPFederationConsumer {
private String generateLinkName() {
return "federation-" + federation.getName() +
"-policy-" + policy.getPolicyName() +
- "-queue-receiver-" + consumerInfo.getFqqn() +
+ "-queue-receiver-" + consumerInfo.getTargetFqqn() +
"-" + federation.getServer().getNodeID() + ":" +
LINK_SEQUENCE_ID.getAndIncrement();
}
@@ -103,7 +103,6 @@ public final class AMQPFederationQueueConsumer extends
AMQPFederationConsumer {
final Receiver protonReceiver =
session.getSession().receiver(generateLinkName());
final Target target = new Target();
final Source source = new Source();
- final String address = consumerInfo.getFqqn();
final Queue localQueue =
federation.getServer().locateQueue(consumerInfo.getQueueName());
if (RoutingType.ANYCAST.equals(consumerInfo.getRoutingType())) {
@@ -116,7 +115,7 @@ public final class AMQPFederationQueueConsumer extends
AMQPFederationConsumer {
source.setDefaultOutcome(DEFAULT_OUTCOME);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
- source.setAddress(address);
+ source.setAddress(consumerInfo.getSourceFqqn());
if (consumerInfo.getFilterString() != null &&
!consumerInfo.getFilterString().isEmpty()) {
final AmqpJmsSelectorFilter jmsFilter = new
AmqpJmsSelectorFilter(consumerInfo.getFilterString());
@@ -126,7 +125,7 @@ public final class AMQPFederationQueueConsumer extends
AMQPFederationConsumer {
source.setFilter(filtersMap);
}
- target.setAddress(address);
+ target.setAddress(consumerInfo.getTargetFqqn());
final Map<Symbol, Object> receiverProperties = new HashMap<>();
receiverProperties.put(FEDERATION_RECEIVER_PRIORITY,
consumerInfo.getPriority());
@@ -248,7 +247,7 @@ public final class AMQPFederationQueueConsumer extends
AMQPFederationConsumer {
super(session.getSessionSPI(), session.getAMQPConnectionContext(),
session, receiver);
this.localQueue = localQueue;
- this.cachedFqqn = SimpleString.of(consumerInfo.getFqqn());
+ this.cachedFqqn = SimpleString.of(consumerInfo.getTargetFqqn());
}
@Override
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
index 041e75a114..5aeb8b137a 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
@@ -45,7 +45,6 @@ import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFro
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role;
import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
-import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -292,11 +291,11 @@ public final class AMQPFederationQueuePolicyManager
extends AMQPFederationLocalP
final String filterString = selectFilter(consumer);
return new AMQPFederationGenericConsumerInfo(Role.QUEUE_CONSUMER,
+ address, // Source and
target address are the same for Queue consumers
address,
queueName,
queue.getRoutingType(),
filterString,
-
CompositeAddress.toFullyQualified(address, queueName),
priority);
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java
index 87f37560ea..69af9ac475 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java
@@ -59,21 +59,38 @@ public interface FederationConsumerInfo {
String getQueueName();
/**
- * Gets the address that will be used for this federation consumer instance.
+ * Gets the address that will be the source of messages for the federation
consumer instance.
* <p>
* For Queue federation this is the address under which the matching queue
must reside. For Address federation this
- * is the actual address whose messages are being federated.
+ * is the actual address whose messages are being federated from on the
remote broker..
*
- * @return the address associated with this federation consumer
+ * @return the source address associated with this federation consumer
*/
- String getAddress();
+ String getSourceAddress();
+
+ /**
+ * Gets the address that will be the target of messages for the federation
consumer instance.
+ * <p>
+ * For Queue federation this is the address under which the matching queue
must reside. For Address federation this
+ * is the actual address whose messages are being federated to on the local
broker.
+ *
+ * @return the target address associated with this federation consumer
+ */
+ String getTargetAddress();
/**
* Gets the FQQN that comprises the address and queue where the remote
consumer will be attached.
*
- * @return provides the FQQN that can be used to address the consumer queue
directly
+ * @return provides the FQQN that can be used to address the remote
consumer queue directly
+ */
+ String getSourceFqqn();
+
+ /**
+ * Gets the FQQN that comprises the address and queue where the local
sender will be attached..
+ *
+ * @return provides the FQQN that can be used to address the local sender
queue directly
*/
- String getFqqn();
+ String getTargetFqqn();
/**
* Gets the routing type that will be requested when creating a consumer on
the remote server.
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationReceiveFromAddressPolicy.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationReceiveFromAddressPolicy.java
index 8620b67610..9c69395d68 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationReceiveFromAddressPolicy.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationReceiveFromAddressPolicy.java
@@ -52,11 +52,12 @@ public class FederationReceiveFromAddressPolicy implements
FederationReceiveFrom
private final long autoDeleteMessageCount;
private final int maxHops;
private final boolean enableDivertBindings;
+ private final boolean allowWildcardGroupings;
private final Map<String, Object> properties;
private final TransformerConfiguration transformerConfig;
- public FederationReceiveFromAddressPolicy(String name, boolean autoDelete,
long autoDeleteDelay,
- long autoDeleteMessageCount, int
maxHops, boolean enableDivertBindings,
+ public FederationReceiveFromAddressPolicy(String name, boolean autoDelete,
long autoDeleteDelay, long autoDeleteMessageCount,
+ int maxHops, boolean
enableDivertBindings, boolean allowWildcardGroupings,
Collection<String>
includeAddresses, Collection<String> excludeAddresses,
Map<String, Object> properties,
TransformerConfiguration transformerConfig,
WildcardConfiguration
wildcardConfig) {
@@ -69,6 +70,7 @@ public class FederationReceiveFromAddressPolicy implements
FederationReceiveFrom
this.autoDeleteMessageCount = autoDeleteMessageCount;
this.maxHops = maxHops;
this.enableDivertBindings = enableDivertBindings;
+ this.allowWildcardGroupings = allowWildcardGroupings;
this.transformerConfig = transformerConfig;
this.includes =
Collections.unmodifiableCollection(Objects.requireNonNullElse(includeAddresses,
Collections.emptyList()));
this.excludes =
Collections.unmodifiableCollection(Objects.requireNonNullElse(excludeAddresses,
Collections.emptyList()));
@@ -114,6 +116,10 @@ public class FederationReceiveFromAddressPolicy implements
FederationReceiveFrom
return enableDivertBindings;
}
+ public boolean isAllowWildcardGroupings() {
+ return allowWildcardGroupings;
+ }
+
public Collection<String> getIncludes() {
return includes;
}
@@ -137,6 +143,7 @@ public class FederationReceiveFromAddressPolicy implements
FederationReceiveFrom
* {@link SimpleString} object or any null checks.
*
* @param addressInfo The address info to check which if null will result
in a negative result.
+ *
* @return {@code true} if the address value matches this configured policy
*/
public boolean test(AddressInfo addressInfo) {
@@ -147,6 +154,18 @@ public class FederationReceiveFromAddressPolicy implements
FederationReceiveFrom
}
}
+ /**
+ * Test method that accepts the raw address name and its routing type and
checks the set of
+ * includes and excludes to determine if the address matches any configured
matchers in this
+ * policy.
+ *
+ * @param address
+ * The address being tested for a match against this policy.
+ * @param type
+ * The routing type associated with the address under test.
+ *
+ * @return <code>true</code> if the address matches this policies
configuration.
+ */
@Override
public boolean test(String address, RoutingType type) {
if (RoutingType.MULTICAST.equals(type)) {
@@ -166,18 +185,56 @@ public class FederationReceiveFromAddressPolicy
implements FederationReceiveFrom
return false;
}
+ /**
+ * Returns the first matcher string that returns a positive match with the
given address.
+ * Excludes are checked in order to prevent an inconsistency between what
the test methods
+ * return and what this method returns. If test would have failed this
method will also fail
+ * to match and will return a <code>null</code> string.
+ *
+ * @param address
+ * The address being tested for a match against this policy.
+ * @param type
+ * The routing type associated with the address under test.
+ *
+ * @return the resulting first matcher to match on the given address.
+ */
+ public String getFirstMatchingAddressPattern(String address, RoutingType
type) {
+ if (RoutingType.MULTICAST.equals(type)) {
+ for (AddressMatcher matcher : excludesMatchers) {
+ if (matcher.test(address)) {
+ return null;
+ }
+ }
+
+ for (AddressMatcher matcher : includesMatchers) {
+ if (matcher.test(address)) {
+ return matcher.getMatcherPattern();
+ }
+ }
+ }
+
+ return null;
+ }
+
private static class AddressMatcher implements Predicate<String> {
+ private String matchPattern;
private final Predicate<String> matcher;
AddressMatcher(String address, WildcardConfiguration wildcardConfig) {
if (address == null || address.isEmpty()) {
+ matchPattern = wildcardConfig.getAnyWordsString();
matcher = (target) -> true;
} else {
+ matchPattern = address;
matcher = new Match<>(address, null,
wildcardConfig).getPattern().asPredicate();
}
}
+ public String getMatcherPattern() {
+ return matchPattern;
+ }
+
@Override
public boolean test(String address) {
return matcher.test(address);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupportTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupportTest.java
index 1868c0de5e..91e13b82bd 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupportTest.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupportTest.java
@@ -57,6 +57,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
@@ -224,13 +225,13 @@ public class AMQPFederationPolicySupportTest {
properties2.put("amqpCredits", 10);
properties2.put("amqpLowCredits", 3);
- doTestEncodeReceiveFromAddressPolicy("test", false, 0, 1, 2, true,
includes, excludes, properties1);
- doTestEncodeReceiveFromAddressPolicy("test", true, 1, 3, 2, false,
includes, excludes, null);
- doTestEncodeReceiveFromAddressPolicy("test", false, 2, 4, -1, false,
includes, excludes, properties2);
- doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true,
includes, excludes, null);
- doTestEncodeReceiveFromAddressPolicy("test", false, 2, 4, -1, false,
null, excludes, properties2);
- doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true,
includes, null, null);
- doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true,
Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
+ doTestEncodeReceiveFromAddressPolicy("test", false, 0, 1, 2, true,
false, includes, excludes, properties1);
+ doTestEncodeReceiveFromAddressPolicy("test", true, 1, 3, 2, false, true,
includes, excludes, null);
+ doTestEncodeReceiveFromAddressPolicy("test", false, 2, 4, -1, false,
false, includes, excludes, properties2);
+ doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true,
true, includes, excludes, null);
+ doTestEncodeReceiveFromAddressPolicy("test", false, 2, 4, -1, false,
true, null, excludes, properties2);
+ doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true,
false, includes, null, null);
+ doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true,
true, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
}
@SuppressWarnings("unchecked")
@@ -240,12 +241,13 @@ public class AMQPFederationPolicySupportTest {
long
autoDeleteMessageCount,
int maxHops,
boolean
enableDivertBindings,
+ boolean
allowWildcardGroupings,
Collection<String>
includes,
Collection<String>
excludes,
Map<String, Object>
policyProperties) {
final FederationReceiveFromAddressPolicy policy = new
FederationReceiveFromAddressPolicy(
- name, autoDelete, autoDeleteDelay, autoDeleteMessageCount, maxHops,
- enableDivertBindings, includes, excludes, policyProperties, null,
DEFAULT_WILDCARD_CONFIGURATION);
+ name, autoDelete, autoDeleteDelay, autoDeleteMessageCount, maxHops,
enableDivertBindings,
+ allowWildcardGroupings, includes, excludes, policyProperties, null,
DEFAULT_WILDCARD_CONFIGURATION);
final AMQPMessage message =
AMQPFederationPolicySupport.encodeAddressPolicyControlMessage(policy);
@@ -263,6 +265,7 @@ public class AMQPFederationPolicySupportTest {
assertEquals(autoDeleteMessageCount,
policyMap.get(ADDRESS_AUTO_DELETE_MSG_COUNT));
assertEquals(maxHops, policyMap.get(ADDRESS_MAX_HOPS));
assertEquals(enableDivertBindings,
policyMap.get(ADDRESS_ENABLE_DIVERT_BINDINGS));
+ assertEquals(allowWildcardGroupings,
policyMap.get(ADDRESS_ALLOW_WILDCARD_GROUPINGS));
if (includes == null || includes.isEmpty()) {
assertFalse(policyMap.containsKey(ADDRESS_INCLUDES));
@@ -420,11 +423,11 @@ public class AMQPFederationPolicySupportTest {
properties.put("amqpCredits", "10");
properties.put("amqpLowCredits", "3");
- doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, includes, excludes, null);
- doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, includes, excludes, properties);
- doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, null, excludes, null);
- doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, includes, null, properties);
- doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
+ doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, false, includes, excludes, null);
+ doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, true, includes, excludes, properties);
+ doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, true, null, excludes, null);
+ doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, false, includes, null, properties);
+ doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2,
true, true, Collections.emptyList(), Collections.emptyList(),
Collections.emptyMap());
}
private void doTestDecodeReceiveFromAddressPolicy(String address, String
name,
@@ -433,6 +436,7 @@ public class AMQPFederationPolicySupportTest {
long
autoDeleteMessageCount,
int maxHops,
boolean
enableDivertBindings,
+ boolean
allowWildcardGroupings,
Collection<String>
includes,
Collection<String>
excludes,
Map<String, String>
policyProperties) throws ActiveMQException {
@@ -453,6 +457,7 @@ public class AMQPFederationPolicySupportTest {
policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, autoDeleteMessageCount);
policyMap.put(ADDRESS_MAX_HOPS, maxHops);
policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, enableDivertBindings);
+ policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, allowWildcardGroupings);
if (includes != null && !includes.isEmpty()) {
policyMap.put(ADDRESS_INCLUDES, new ArrayList<>(includes));
@@ -469,8 +474,8 @@ public class AMQPFederationPolicySupportTest {
final FederationReceiveFromAddressPolicy policy =
AMQPFederationPolicySupport.decodeReceiveFromAddressPolicy(amqpMessage,
DEFAULT_WILDCARD_CONFIGURATION);
- checkPolicyMatchesExpectations(policy, name, autoDelete,
autoDeleteDelay, autoDeleteMessageCount,
- maxHops, enableDivertBindings, includes,
excludes, policyProperties);
+ checkPolicyMatchesExpectations(policy, name, autoDelete,
autoDeleteDelay, autoDeleteMessageCount, maxHops,
+ enableDivertBindings,
allowWildcardGroupings, includes, excludes, policyProperties);
}
@Test
@@ -572,12 +577,12 @@ public class AMQPFederationPolicySupportTest {
properties.put("amqpCredits", "10");
properties.put("amqpLowCredits", "3");
- doTestCreateAddressPolicyFromConfigurationElement("test", false, 0, 1,
2, true, includes, excludes, null);
- doTestCreateAddressPolicyFromConfigurationElement("test", true, 1, 2, 3,
true, includes, excludes, properties);
- doTestCreateAddressPolicyFromConfigurationElement("test", false, 10, 9,
8, false, null, excludes, properties);
- doTestCreateAddressPolicyFromConfigurationElement("test", true, 1, 1, 1,
false, includes, null, null);
- doTestCreateAddressPolicyFromConfigurationElement("test", false, 7, 1,
1, true, null, null, properties);
- doTestCreateAddressPolicyFromConfigurationElement("test", false, 7, 1,
1, true, Collections.emptySet(), Collections.emptySet(),
Collections.emptyMap());
+ doTestCreateAddressPolicyFromConfigurationElement("test", false, 0, 1,
2, true, false, includes, excludes, null);
+ doTestCreateAddressPolicyFromConfigurationElement("test", true, 1, 2, 3,
true, true, includes, excludes, properties);
+ doTestCreateAddressPolicyFromConfigurationElement("test", false, 10, 9,
8, false, true, null, excludes, properties);
+ doTestCreateAddressPolicyFromConfigurationElement("test", true, 1, 1, 1,
false, false, includes, null, null);
+ doTestCreateAddressPolicyFromConfigurationElement("test", false, 7, 1,
1, true, true, null, null, properties);
+ doTestCreateAddressPolicyFromConfigurationElement("test", false, 7, 1,
1, true, false, Collections.emptySet(), Collections.emptySet(),
Collections.emptyMap());
}
private void doTestCreateAddressPolicyFromConfigurationElement(String name,
@@ -586,6 +591,7 @@ public class AMQPFederationPolicySupportTest {
long
autoDeleteMessageCount,
int maxHops,
boolean
enableDivertBindings,
+ boolean
allowWildcardGroupings,
Collection<String> includes,
Collection<String> excludes,
Map<String,
Object> policyProperties) throws ActiveMQException {
@@ -598,6 +604,7 @@ public class AMQPFederationPolicySupportTest {
element.setAutoDeleteMessageCount(autoDeleteMessageCount);
element.setMaxHops(maxHops);
element.setEnableDivertBindings(enableDivertBindings);
+ element.setAllowWildcardGroupings(allowWildcardGroupings);
element.setProperties(policyProperties);
if (includes != null) {
@@ -611,12 +618,13 @@ public class AMQPFederationPolicySupportTest {
final FederationReceiveFromAddressPolicy policy =
AMQPFederationPolicySupport.create(element, DEFAULT_WILDCARD_CONFIGURATION);
checkPolicyMatchesExpectations(policy, name, autoDelete,
autoDeleteDelay, autoDeleteMessageCount, maxHops,
- enableDivertBindings, includes, excludes,
policyProperties);
+ enableDivertBindings,
allowWildcardGroupings, includes, excludes, policyProperties);
}
private void
checkPolicyMatchesExpectations(FederationReceiveFromAddressPolicy policy,
String name, boolean
autoDelete, long autoDeleteDelay,
- long autoDeleteMessageCount,
int maxHops, boolean enableDivertBindings,
+ long autoDeleteMessageCount,
int maxHops,
+ boolean enableDivertBindings,
boolean enableWildcardSubscriptions,
Collection<?> includes,
Collection<?> excludes,
Map<String, ?>
policyProperties) {
assertEquals(name, policy.getPolicyName());
@@ -625,6 +633,7 @@ public class AMQPFederationPolicySupportTest {
assertEquals(autoDeleteMessageCount, policy.getAutoDeleteMessageCount());
assertEquals(maxHops, policy.getMaxHops());
assertEquals(enableDivertBindings, policy.isEnableDivertBindings());
+ assertEquals(enableWildcardSubscriptions,
policy.isAllowWildcardGroupings());
if (includes == null || includes.isEmpty()) {
assertTrue(policy.getIncludes().isEmpty());
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElement.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElement.java
index 6ee4351fb7..6f3ef9b64c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElement.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElement.java
@@ -39,6 +39,7 @@ public final class AMQPFederationAddressPolicyElement
implements Serializable {
private Long autoDeleteMessageCount;
private int maxHops;
private Boolean enableDivertBindings;
+ private Boolean allowWildcardGroupings;
private TransformerConfiguration transformerConfig;
public String getName() {
@@ -173,6 +174,15 @@ public final class AMQPFederationAddressPolicyElement
implements Serializable {
return transformerConfig;
}
+ public Boolean isAllowWildcardGroupings() {
+ return allowWildcardGroupings;
+ }
+
+ public AMQPFederationAddressPolicyElement setAllowWildcardGroupings(Boolean
allowWildcardGroupings) {
+ this.allowWildcardGroupings = allowWildcardGroupings;
+ return this;
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
@@ -190,14 +200,15 @@ public final class AMQPFederationAddressPolicyElement
implements Serializable {
Objects.equals(autoDeleteDelay, other.autoDeleteDelay) &&
Objects.equals(autoDeleteMessageCount,
other.autoDeleteMessageCount) &&
Objects.equals(enableDivertBindings, other.enableDivertBindings)
&&
+ Objects.equals(allowWildcardGroupings,
other.allowWildcardGroupings) &&
Objects.equals(transformerConfig, other.transformerConfig) &&
maxHops == other.maxHops;
}
@Override
public int hashCode() {
- return Objects.hash(name, includes, excludes, properties, autoDelete,
autoDeleteDelay,
- autoDeleteMessageCount, maxHops,
enableDivertBindings, transformerConfig);
+ return Objects.hash(name, includes, excludes, properties, autoDelete,
autoDeleteDelay, autoDeleteMessageCount,
+ maxHops, enableDivertBindings,
allowWildcardGroupings, transformerConfig);
}
// We are required to implement a named match type so that we can perform
this configuration
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 7927aa6b73..580355d4ac 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2349,6 +2349,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
config.setAutoDeleteMessageCount(MINUS_ONE_OR_GE_ZERO.validate("auto-delete-message-count",
Long.parseLong(item.getNodeValue())).longValue());
} else if (item.getNodeName().equals("enable-divert-bindings")) {
config.setEnableDivertBindings(Boolean.parseBoolean(item.getNodeValue()));
+ } else if (item.getNodeName().equals("allow-wildcard-groupings")) {
+
config.setAllowWildcardGroupings(Boolean.parseBoolean(item.getNodeValue()));
}
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 63c6cf5321..2af3e1b144 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2462,6 +2462,7 @@
<xsd:attribute name="max-hops" type="xsd:int" use="optional" />
<xsd:attribute name="name" type="xsd:ID" use="required" />
<xsd:attribute name="enable-divert-bindings" type="xsd:boolean"
use="optional" />
+ <xsd:attribute name="allow-wildcard-groupings" type="xsd:boolean"
use="optional" />
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElementTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElementTest.java
index f8fab0db19..28457ee8e3 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElementTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElementTest.java
@@ -120,5 +120,13 @@ public class AMQPFederationAddressPolicyElementTest {
config2.setMaxHops(10);
assertEquals(config1, config2);
assertEquals(config1.hashCode(), config2.hashCode());
+
+ // Allow receivers to nest under remote wildcard addresses
+ config1.setAllowWildcardGroupings(true);
+ assertNotEquals(config1, config2);
+ assertNotEquals(config1.hashCode(), config2.hashCode());
+ config2.setAllowWildcardGroupings(true);
+ assertEquals(config1, config2);
+ assertEquals(config1.hashCode(), config2.hashCode());
}
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 8b2bf1f4ad..fd4f1fc701 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -469,6 +469,7 @@ public class ConfigurationImplTest extends
AbstractConfigurationTestBase {
insertionOrderedProperties.put("AMQPConnections.target.federations.abc."
+ policyType + ".policy2.includes.m4.addressMatch", "y");
insertionOrderedProperties.put("AMQPConnections.target.federations.abc."
+ policyType + ".policy2.excludes.m5.addressMatch", "z");
insertionOrderedProperties.put("AMQPConnections.target.federations.abc."
+ policyType + ".policy2.enableDivertBindings", "true");
+ insertionOrderedProperties.put("AMQPConnections.target.federations.abc."
+ policyType + ".policy2.allowWildcardGroupings", "true");
insertionOrderedProperties.put("AMQPConnections.target.federations.abc."
+ policyType + ".policy2.properties.a", "b");
configuration.parsePrefixedProperties(insertionOrderedProperties, null);
@@ -517,6 +518,7 @@ public class ConfigurationImplTest extends
AbstractConfigurationTestBase {
assertEquals(42L,
addressPolicy1.getAutoDeleteMessageCount().longValue());
assertEquals(10000L, addressPolicy1.getAutoDeleteDelay().longValue());
assertNull(addressPolicy1.isEnableDivertBindings());
+ assertNull(addressPolicy1.isAllowWildcardGroupings());
assertTrue(addressPolicy1.getProperties().isEmpty());
addressPolicy1.getIncludes().forEach(match -> {
@@ -536,6 +538,7 @@ public class ConfigurationImplTest extends
AbstractConfigurationTestBase {
assertNull(addressPolicy2.getAutoDeleteMessageCount());
assertNull(addressPolicy2.getAutoDeleteDelay());
assertTrue(addressPolicy2.isEnableDivertBindings());
+ assertTrue(addressPolicy2.isAllowWildcardGroupings());
assertFalse(addressPolicy2.getProperties().isEmpty());
assertEquals("b", addressPolicy2.getProperties().get("a"));
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 88a7619c8f..ba33b5a739 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -501,7 +501,7 @@
<property key="amqpCredits" value="2"/>
<property key="amqpLowCredits" value="1"/>
</remote-queue-policy>
- <local-address-policy name="lap1" auto-delete="false"
auto-delete-delay="1" auto-delete-message-count="12" max-hops="2"
enable-divert-bindings="true">
+ <local-address-policy name="lap1" auto-delete="false"
auto-delete-delay="1" auto-delete-message-count="12" max-hops="2"
enable-divert-bindings="true" allow-wildcard-groupings="true">
<include address-match="orders" />
<exclude address-match="all.#" />
<transformer-class-name>class-name</transformer-class-name>
@@ -510,7 +510,7 @@
<include address-match="address" queue-match="theQueue" />
<transformer-class-name>class-another</transformer-class-name>
</local-queue-policy>
- <remote-address-policy name="rap1" auto-delete="true"
auto-delete-delay="2" auto-delete-message-count="42" max-hops="1"
enable-divert-bindings="false">
+ <remote-address-policy name="rap1" auto-delete="true"
auto-delete-delay="2" auto-delete-message-count="42" max-hops="1"
enable-divert-bindings="false" allow-wildcard-groupings="false">
<include address-match="support" />
<property key="amqpCredits" value="2"/>
<property key="amqpLowCredits" value="1"/>
diff --git a/docs/user-manual/amqp-federation-configuration-glossary.adoc
b/docs/user-manual/amqp-federation-configuration-glossary.adoc
index 3a12309981..9f5be0b09c 100644
--- a/docs/user-manual/amqp-federation-configuration-glossary.adoc
+++ b/docs/user-manual/amqp-federation-configuration-glossary.adoc
@@ -92,6 +92,7 @@ In this case the policy is sent across the connection to the
remote broker and f
auto-delete-delay="30000"
auto-delete-message-count="0"
max-hops="1"
+ allow-wildcard-groupings="false"
enable-divert-bindings="true">
<include address-match="local-address.#" />
<exclude address-match="local-address.excluded" />
@@ -127,6 +128,11 @@ enable-divert-bindings::
Setting to `true` enables divert bindings to be listened-to for demand.
If a divert binding with an address matches the included addresses for the
address policy, any queue bindings that match the forwarding address of the
divert creates demand.
The default value is `false`.
+allow-wildcard-groupings::
+Setting to 'true' enables the address federation policy matches to define if a
remote wildcard address will be used to host the address receiver bindings for
the matching addresses if the match value is itself a wildcard.
+This can be useful if federating large numbers of addresses that have low
traffic as it reduces the number of addresses that need to exist on the remote
federation target at the expense of higher CPU load on the target in order to
route messages to the appropriate wildcard bindings for the matched addresses.
+The default value is `false` and the federation receiver address bindings will
be attached to the matching addresses on the remote.
+When enabled the remote broker must have wildcard routing enabled for this
feature to work correctly.
The following set of elements can be configured within the address policy to
control the behavior of the deployed policy.
For the policy to actually match any addresses on the target broker an include
must be provided at a minimum.
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index 71b8d48117..1cd0c1fefd 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -22,6 +22,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
@@ -106,6 +107,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
@@ -2035,7 +2037,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, 1, true,
+ true, 30_000L, 1000L, 1, true,
false,
includes, excludes, null, null,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -2081,7 +2083,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, 1, true,
+ true, 30_000L, 1000L, 1, true,
false,
includes, excludes, null,
transformerConfiguration,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -2119,7 +2121,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, 1, true,
+ true, 30_000L, 1000L, 1, true,
false,
includes, null, properties,
null,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -2189,7 +2191,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, 1, true,
+ true, 30_000L, 1000L, 1, true,
false,
includes, null, properties,
null,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -2261,7 +2263,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, 1, true,
+ true, 30_000L, 1000L, 1, true,
false,
includes, null, properties,
null,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -2335,7 +2337,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, 1, true,
+ true, 30_000L, 1000L, 1, true,
false,
includes, null, properties,
null,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -2414,7 +2416,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, 1, true,
+ true, 30_000L, 1000L, 1, true,
false,
includes, null, properties,
transformerConfiguration,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -3531,6 +3533,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
policyMap.put(ADDRESS_MAX_HOPS, 5);
policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+ policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
policyMap.put(ADDRESS_INCLUDES, includes);
final EncodedAmqpValueMatcher bodyMatcher = new
EncodedAmqpValueMatcher(policyMap);
@@ -3617,6 +3620,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
policyMap.put(ADDRESS_MAX_HOPS, 5);
policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+ policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
policyMap.put(ADDRESS_INCLUDES, includes);
final EncodedAmqpValueMatcher bodyMatcher = new
EncodedAmqpValueMatcher(policyMap);
@@ -6705,7 +6709,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, -1, true,
+ true, 30_000L, 1000L, -1,
true, false,
includes, null, properties,
null,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -7063,7 +7067,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, 1, false,
+ true, 30_000L, 1000L, 1,
false, false,
includes, null, properties,
null,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -7169,7 +7173,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final FederationReceiveFromAddressPolicy policy =
new FederationReceiveFromAddressPolicy("test-address-policy",
- true, 30_000L, 1000L, 1, true,
+ true, 30_000L, 1000L, 1, true,
false,
includes, null, properties,
null,
DEFAULT_WILDCARD_CONFIGURATION);
@@ -7546,6 +7550,493 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void testWildcardAddressUedForFederationSubscriptionWhenEnabled()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respond()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+ .withSource().withDynamic(true)
+ .and()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind()
+ .withTarget().withAddress("test-dynamic-events");
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final String wildcardAddress = getTestName() + ".#";
+ final String addressA = getTestName() + ".A";
+ final String addressB = getTestName() + ".B";
+
+ final String expectedJMSFilterA = "AMQAddress='" + addressA + "'";
+ final String expectedJMSFilterB = "AMQAddress='" + addressB + "'";
+ final AtomicReference<Attach> capturedAttachA = new
AtomicReference<>();
+ final AtomicReference<Attach> capturedAttachB = new
AtomicReference<>();
+
+ final Symbol jmsSelectorKey = Symbol.valueOf("jms-selector");
+ final
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong
jmsSelectorCode =
+
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(wildcardAddress);
+ receiveFromAddress.setAutoDelete(true);
+ receiveFromAddress.setAutoDeleteDelay(10_000L);
+ receiveFromAddress.setAutoDeleteMessageCount(-1L);
+ receiveFromAddress.setAllowWildcardGroupings(true);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(receiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ // A first binding on the wildcard address what collects only
messages for address A
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withCapture(attach -> capturedAttachA.set(attach))
+ .withName(allOf(containsString(getTestName()),
+ containsString(addressA),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withSource().withAddress(startsWith(wildcardAddress)).also()
+ .withTarget().withAddress(addressA).also()
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumerA =
session.createConsumer(session.createTopic(addressA));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ // A second binding on the wildcard address what collects only
messages for address B
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withCapture(attach ->
capturedAttachB.set(attach))
+ .withName(allOf(containsString(getTestName()),
+ containsString(addressB),
+
containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withSource().withAddress(startsWith(wildcardAddress)).also()
+ .withTarget().withAddress(addressB).also()
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final MessageConsumer consumerB =
session.createConsumer(session.createTopic(addressB));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final Map<Symbol, Object> filtersMapA =
capturedAttachA.get().getSource().getFilter();
+
+ assertNotNull(filtersMapA);
+ assertTrue(filtersMapA.containsKey(jmsSelectorKey));
+ final DescribedType jmsSelectorEntryA = (DescribedType)
filtersMapA.get(jmsSelectorKey);
+ assertNotNull(jmsSelectorEntryA);
+ assertEquals(jmsSelectorEntryA.getDescriptor(), jmsSelectorCode);
+ assertEquals(jmsSelectorEntryA.getDescribed().toString(),
expectedJMSFilterA);
+
+ final Map<Symbol, Object> filtersMapB =
capturedAttachB.get().getSource().getFilter();
+
+ assertNotNull(filtersMapB);
+ assertTrue(filtersMapB.containsKey(jmsSelectorKey));
+ final DescribedType jmsSelectorEntryB = (DescribedType)
filtersMapB.get(jmsSelectorKey);
+ assertNotNull(jmsSelectorEntryB);
+ assertEquals(jmsSelectorEntryB.getDescriptor(), jmsSelectorCode);
+ assertEquals(jmsSelectorEntryB.getDescribed().toString(),
expectedJMSFilterB);
+
+ // Route some messages to check they arrive on the local consumers
+ peer.expectDisposition().withSettled(true).withState().accepted();
+ peer.expectDisposition().withSettled(true).withState().accepted();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(0)
+ .withBody().withValue("Address A").also()
+ .now();
+ peer.remoteTransfer().withHandle(3)
+ .withDeliveryId(1)
+ .withBody().withValue("Address B").also()
+ .later(10);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final TextMessage messageA = (TextMessage)
consumerA.receiveNoWait();
+ final TextMessage messageB = (TextMessage)
consumerB.receiveNoWait();
+
+ assertNotNull(messageA);
+ assertNotNull(messageB);
+
+ assertEquals("Address A", messageA.getText());
+ assertEquals("Address B", messageB.getText());
+
+ assertNull(consumerA.receiveNoWait());
+ assertNull(consumerB.receiveNoWait());
+
+ peer.close();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFallbackToLegacySubscriptionIfNoFQQNSupportAndWildcardSubscriptionsGroupsEnabled()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respond() // Respond with no V2 property
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+ .withSource().withDynamic(true)
+ .and()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind()
+ .withTarget().withAddress("test-dynamic-events");
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final String wildcardAddress = getTestName() + ".#";
+ final String addressA = getTestName() + ".A";
+ final String addressB = getTestName() + ".B";
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(wildcardAddress);
+ receiveFromAddress.setAutoDelete(true);
+ receiveFromAddress.setAutoDeleteDelay(10_000L);
+ receiveFromAddress.setAutoDeleteMessageCount(-1L);
+ receiveFromAddress.setAllowWildcardGroupings(true);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(receiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ // Binding on the matching address since no FQQN support
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString(addressA),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+ .withSource().withAddress(addressA).also()
+ .withTarget().withAddress(addressA).also()
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumerA =
session.createConsumer(session.createTopic(addressA));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ // Binding on the matching address since no FQQN support
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString(addressB),
+
containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+ .withSource().withAddress(addressB).also()
+ .withTarget().withAddress(addressB).also()
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final MessageConsumer consumerB =
session.createConsumer(session.createTopic(addressB));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Route some messages to check they arrive on the local consumers
+ peer.expectDisposition().withSettled(true).withState().accepted();
+ peer.expectDisposition().withSettled(true).withState().accepted();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(0)
+ .withBody().withValue("Address A").also()
+ .now();
+ peer.remoteTransfer().withHandle(3)
+ .withDeliveryId(1)
+ .withBody().withValue("Address B").also()
+ .later(10);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final TextMessage messageA = (TextMessage)
consumerA.receiveNoWait();
+ final TextMessage messageB = (TextMessage)
consumerB.receiveNoWait();
+
+ assertNotNull(messageA);
+ assertNotNull(messageB);
+
+ assertEquals("Address A", messageA.getText());
+ assertEquals("Address B", messageB.getText());
+
+ assertNull(consumerA.receiveNoWait());
+ assertNull(consumerB.receiveNoWait());
+
+ peer.close();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFederationTreatsNonWildcardMatchAsNormalWhenWildcardSubscriptionsSetToAllowed()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respond() // Need V2 for wildcard subscription to
even occur
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+ .withSource().withDynamic(true)
+ .and()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind()
+ .withTarget().withAddress("test-dynamic-events");
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(getTestName());
+ receiveFromAddress.addToIncludes("unmatched.a.#");
+ receiveFromAddress.addToIncludes("unmatched.*.b");
+ receiveFromAddress.setAllowWildcardGroupings(true); // Allowed but
match will not be on any of the wildcards
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(receiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+ .withSource().withAddress(startsWith(getTestName()
+ "::")).also() // FQQN Address exact match
+ .withTarget().withAddress(getTestName()).also()
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ session.createConsumer(session.createTopic(getTestName()));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFederationAttachesToRemoteBrokerWithWildcardRoutingDisabled() throws
Exception {
+ // Disable wildcard routing to check / demonstrate that mis-configured
federation topologies
+ // can fail to federate if wild card subscription grouping is enabled
but the brokers can't do
+ // wildcard routing for those subscriptions.
+ server.getConfiguration().setWildCardConfiguration(new
WildcardConfiguration().setRoutingEnabled(false));
+ server.start();
+
+ final String wildcardAddress = getTestName() + ".#";
+ final String actualAddress = getTestName() + ".A";
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 1L);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test", true);
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withTarget().withAddress(actualAddress).also()
+
.withSource().withAddress(wildcardAddress + "::federation-unique-queue-name");
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withTarget().withAddress(actualAddress).also()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(wildcardAddress +
"::federation-unique-queue-name")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ Wait.assertTrue(() ->
server.addressQuery(SimpleString.of(wildcardAddress)).isExists());
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic(actualAddress));
+
+ connection.start();
+
+ // We adon't expect the federation to get this as the remote
broker has disabled wildcard
+ // routing but does not reject subscriptions on wildcard addresses
in this configuration.
+ // This is a case where user has not properly configured their
federation servers.
+ producer.send(session.createTextMessage("test-message"));
+ }
+
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFederationReceiverAttachedToWildcardAddressWhenAutoCreateAddressIsDisabledAndAddressesStaticallyDefined()
throws Exception {
+ final AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setAutoCreateQueues(true);
+ addressSettings.setAutoCreateAddresses(false);
+
+ final String wildcardAddress = getTestName() + ".#";
+ final String actualAddress = getTestName() + ".A";
+
+ server.getConfiguration().getAddressSettings().put("#", addressSettings);
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of(wildcardAddress),
RoutingType.MULTICAST));
+ server.addAddressInfo(new AddressInfo(SimpleString.of(actualAddress),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 1L);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test", true);
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withTarget().withAddress(actualAddress).also()
+
.withSource().withAddress(wildcardAddress + "::federation-unique-queue-name");
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withTarget().withAddress(actualAddress).also()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(wildcardAddress +
"::federation-unique-queue-name")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().withMessage()
+ .withHeader().also()
+ .withMessageAnnotations().also()
+ .withProperties().also()
+ .withValue("test-message").and()
+ .accept();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ Wait.assertTrue(() ->
server.addressQuery(SimpleString.of(wildcardAddress)).isExists());
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic(actualAddress));
+
+ connection.start();
+
+ producer.send(session.createTextMessage("test-message"));
+ }
+
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
protected void configureSecurity(ActiveMQServer server, String allowed,
String restricted, String... userAllowedOnly) {
ActiveMQJAASSecurityManager securityManager =
(ActiveMQJAASSecurityManager) server.getSecurityManager();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index e84355992e..ad5207323b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -67,6 +67,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
@@ -617,6 +618,7 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 314L);
policyMap.put(ADDRESS_MAX_HOPS, 5);
policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+ policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
policyMap.put(ADDRESS_INCLUDES, includes);
policyMap.put(ADDRESS_EXCLUDES, excludes);
@@ -690,6 +692,7 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 314L);
policyMap.put(ADDRESS_MAX_HOPS, 5);
policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+ policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
policyMap.put(ADDRESS_INCLUDES, includes);
policyMap.put(ADDRESS_EXCLUDES, excludes);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java
index 53f4837c65..582056ea82 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java
@@ -73,6 +73,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
@@ -1080,6 +1081,7 @@ class AMQPFederationManagementTest extends
AmqpClientTestSupport {
policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
policyMap.put(ADDRESS_MAX_HOPS, 5);
policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+ policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
policyMap.put(ADDRESS_INCLUDES, includes);
final EncodedAmqpValueMatcher bodyMatcher = new
EncodedAmqpValueMatcher(policyMap);
@@ -1282,6 +1284,7 @@ class AMQPFederationManagementTest extends
AmqpClientTestSupport {
policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
policyMap.put(ADDRESS_MAX_HOPS, 5);
policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+ policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
policyMap.put(ADDRESS_INCLUDES, includes);
final EncodedAmqpValueMatcher bodyMatcher = new
EncodedAmqpValueMatcher(policyMap);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index 7dd6ec9e47..e5c7a378c5 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -24,6 +24,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -2160,4 +2161,293 @@ public class AMQPFederationServerToServerTest extends
AmqpClientTestSupport {
assertNotNull(consumer.receiveNoWait());
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederationOccursUsingWildcardSubscriptionsIfEnabledCore() throws
Exception {
+ doTestAddressFederationOccursUsingWildcardSubscriptionsIfEnabled("CORE");
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederationOccursUsingWildcardSubscriptionsIfEnabledAMQP() throws
Exception {
+ doTestAddressFederationOccursUsingWildcardSubscriptionsIfEnabled("AMQP");
+ }
+
+ private void
doTestAddressFederationOccursUsingWildcardSubscriptionsIfEnabled(String
protocol) throws Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final SimpleString wildcardAddress = SimpleString.of(getTestName() +
".#");
+ final SimpleString fullAddressA = SimpleString.of(getTestName() + ".A");
+ final SimpleString fullAddressB = SimpleString.of(getTestName() + ".B");
+ final SimpleString unmatchedAddress = SimpleString.of(getTestName() +
"NoMatch");
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy.setName("local-test-policy");
+ localAddressPolicy.addToIncludes(wildcardAddress.toString());
+ localAddressPolicy.setAutoDelete(false);
+ localAddressPolicy.setAutoDeleteDelay(-1L);
+ localAddressPolicy.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+ localAddressPolicy.setAllowWildcardGroupings(true);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName() + ":Wildcards");
+ element.addLocalAddressPolicy(localAddressPolicy);
+
+ final AMQPBrokerConnectConfiguration amqpConnection1 =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection1.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection1);
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + SERVER_PORT);
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory(protocol, "tcp://localhost:" +
SERVER_PORT_REMOTE);
+
+ try (Connection connectionL = factoryLocal.createConnection();
+ Connection connectionR = factoryRemote.createConnection()) {
+
+ final Session sessionL =
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Session sessionR =
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ final Topic topicA = sessionL.createTopic(fullAddressA.toString());
+ final Topic topicB = sessionL.createTopic(fullAddressB.toString());
+ final Topic topicUnmatched =
sessionL.createTopic(unmatchedAddress.toString());
+
+ final MessageConsumer consumerL =
sessionL.createConsumer(topicUnmatched);
+ final MessageConsumer consumerL_A = sessionL.createConsumer(topicA);
+ final MessageConsumer consumerL_B = sessionL.createConsumer(topicB);
+ final MessageProducer producerR = sessionR.createProducer(null);
+
+ connectionL.start();
+ connectionR.start();
+
+ // Local broker should see the full address but the remote should
only have a wildcard subscription
+ // The wildcard subscription should have two bindings since we have
two matching consumers
+ Wait.assertTrue(() -> server.bindingQuery(fullAddressA,
false).getQueueNames().size() == 1, 10_000, 50);
+ Wait.assertTrue(() -> server.bindingQuery(fullAddressB,
false).getQueueNames().size() == 1, 10_000, 50);
+ Wait.assertTrue(() ->
remoteServer.addressQuery(wildcardAddress).isExists());
+ Wait.assertTrue(() -> remoteServer.bindingQuery(wildcardAddress,
false).getQueueNames().size() == 2, 10_000, 50);
+
+ assertFalse(remoteServer.addressQuery(fullAddressA).isExists());
+ assertFalse(remoteServer.addressQuery(fullAddressA).isExists());
+
+ final TextMessage remoteMessage =
sessionR.createTextMessage(getTestName());
+
+ producerR.send(topicA, remoteMessage);
+
+ final TextMessage localMessageA = (TextMessage)
consumerL_A.receive(1_000);
+
+ assertNotNull(localMessageA);
+ assertNull(consumerL_B.receiveNoWait());
+
+ producerR.send(topicB, remoteMessage);
+
+ final TextMessage localMessageB = (TextMessage)
consumerL_B.receive(1_000);
+
+ assertNotNull(localMessageB);
+ assertNull(consumerL_A.receiveNoWait());
+
+ consumerL_A.close();
+ consumerL_B.close();
+
+ Wait.assertTrue(() -> remoteServer.bindingQuery(wildcardAddress,
false).getQueueNames().size() == 0, 10_000, 50);
+
+ assertNull(consumerL.receiveNoWait());
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederationOccursUsingWildcardSubscriptionsIfEnabledUsingSingleWordMatcher()
throws Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final SimpleString wildcardAddress = SimpleString.of(getTestName() +
".*.region");
+ final SimpleString fullAddressA = SimpleString.of(getTestName() +
".A.region");
+ final SimpleString fullAddressB = SimpleString.of(getTestName() +
".B.region");
+ final SimpleString unmatchedAddress = SimpleString.of(getTestName() +
".C.regions");
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy.setName("local-test-policy");
+ localAddressPolicy.addToIncludes(wildcardAddress.toString());
+ localAddressPolicy.setAutoDelete(false);
+ localAddressPolicy.setAutoDeleteDelay(-1L);
+ localAddressPolicy.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+ localAddressPolicy.setAllowWildcardGroupings(true);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName() + ":Wildcards");
+ element.addLocalAddressPolicy(localAddressPolicy);
+
+ final AMQPBrokerConnectConfiguration amqpConnection1 =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection1.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection1);
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+ try (Connection connectionL = factoryLocal.createConnection();
+ Connection connectionR = factoryRemote.createConnection()) {
+
+ final Session sessionL =
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Session sessionR =
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ final Topic topicA = sessionL.createTopic(fullAddressA.toString());
+ final Topic topicB = sessionL.createTopic(fullAddressB.toString());
+ final Topic topicUnmatched =
sessionL.createTopic(unmatchedAddress.toString());
+
+ final MessageConsumer consumerL =
sessionL.createConsumer(topicUnmatched);
+ final MessageConsumer consumerL_A = sessionL.createConsumer(topicA);
+ final MessageConsumer consumerL_B = sessionL.createConsumer(topicB);
+ final MessageProducer producerR = sessionR.createProducer(null);
+
+ connectionL.start();
+ connectionR.start();
+
+ // Local broker should see the full address but the remote should
only have a wildcard subscription
+ // The wildcard subscription should have two bindings since we have
two matching consumers
+ Wait.assertTrue(() -> server.bindingQuery(fullAddressA,
false).getQueueNames().size() == 1, 10_000, 50);
+ Wait.assertTrue(() -> server.bindingQuery(fullAddressB,
false).getQueueNames().size() == 1, 10_000, 50);
+ Wait.assertTrue(() ->
remoteServer.addressQuery(wildcardAddress).isExists());
+ Wait.assertTrue(() -> remoteServer.bindingQuery(wildcardAddress,
false).getQueueNames().size() == 2, 10_000, 50);
+
+ assertFalse(remoteServer.addressQuery(fullAddressA).isExists());
+ assertFalse(remoteServer.addressQuery(fullAddressA).isExists());
+
+ final TextMessage remoteMessage =
sessionR.createTextMessage(getTestName());
+
+ producerR.send(topicA, remoteMessage);
+
+ final TextMessage localMessageA = (TextMessage)
consumerL_A.receive(1_000);
+
+ assertNotNull(localMessageA);
+ assertNull(consumerL_B.receiveNoWait());
+
+ producerR.send(topicB, remoteMessage);
+
+ final TextMessage localMessageB = (TextMessage)
consumerL_B.receive(1_000);
+
+ assertNotNull(localMessageB);
+ assertNull(consumerL_A.receiveNoWait());
+
+ consumerL_A.close();
+ consumerL_B.close();
+
+ Wait.assertTrue(() -> remoteServer.bindingQuery(wildcardAddress,
false).getQueueNames().size() == 0, 10_000, 50);
+
+ assertNull(consumerL.receiveNoWait());
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testRemoteAddressFederationAppliesConsumerFiltersIfEnabledEvenWithWildcardSubscriptions()
throws Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final SimpleString wildcardAddress = SimpleString.of(getTestName() +
".#");
+ final SimpleString fullAddressA = SimpleString.of(getTestName() + ".A");
+
+ final AMQPFederationAddressPolicyElement remoteAddressPolicy = new
AMQPFederationAddressPolicyElement();
+ remoteAddressPolicy.setName("test-policy");
+ remoteAddressPolicy.addToIncludes(wildcardAddress.toString());
+ remoteAddressPolicy.setAutoDelete(false);
+ remoteAddressPolicy.setAutoDeleteDelay(-1L);
+ remoteAddressPolicy.setAutoDeleteMessageCount(-1L);
+ remoteAddressPolicy.setAllowWildcardGroupings(true);
+ remoteAddressPolicy.addProperty(IGNORE_ADDRESS_BINDING_FILTERS,
String.valueOf(false));
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addRemoteAddressPolicy(remoteAddressPolicy);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "failover:(amqp://localhost:" +
SERVER_PORT_REMOTE + ")");
+
+ final Connection connectionL = factoryLocal.createConnection();
+ final Connection connectionR = factoryRemote.createConnection();
+
+ final Session sessionL =
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Session sessionR =
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ final Topic topic = sessionR.createTopic(fullAddressA.toString());
+
+ final MessageConsumer consumerR = sessionR.createConsumer(topic,
"color='red'");
+
+ connectionL.start();
+ connectionR.start();
+
+ Wait.assertTrue(() -> server.addressQuery(wildcardAddress).isExists());
+ Wait.assertTrue(() -> server.bindingQuery(wildcardAddress,
false).getQueueNames().size() == 1);
+
+ Wait.assertTrue(() ->
remoteServer.addressQuery(fullAddressA).isExists());
+ Wait.assertTrue(() -> remoteServer.bindingQuery(fullAddressA,
false).getQueueNames().size() == 1);
+
+ try {
+ final MessageProducer producerL = sessionL.createProducer(topic);
+ final TextMessage message = sessionL.createTextMessage();
+
+ message.setText("First Red Message");
+ message.setStringProperty("color", "red");
+ producerL.send(message);
+
+ // Message that matched consumer filter should federate
+ final Message received1 = consumerR.receive(5_000);
+
+ assertNotNull(received1);
+ assertInstanceOf(TextMessage.class, received1);
+ assertEquals("First Red Message", ((TextMessage)
received1).getText());
+ assertTrue(received1.propertyExists("color"));
+ assertEquals("red", received1.getStringProperty("color"));
+
+ // should be filtered and not sent over the federation link from the
remote server.
+ message.setText("Hello World Blue");
+ message.setStringProperty("color", "blue");
+ producerL.send(message);
+
+ message.setText("Second Red Message");
+ message.setStringProperty("color", "red");
+ producerL.send(message);
+
+ final Message received2 = consumerR.receive(5_000);
+
+ assertNotNull(received2);
+ assertInstanceOf(TextMessage.class, received2);
+ assertEquals("Second Red Message", ((TextMessage)
received2).getText());
+ assertTrue(received2.propertyExists("color"));
+ assertEquals("red", received2.getStringProperty("color"));
+
+ // Should be no more messages
+ assertNull(consumerR.receiveNoWait());
+
+ final org.apache.activemq.artemis.core.server.Queue localQueue =
+ server.locateQueue(server.bindingQuery(wildcardAddress,
false).getQueueNames().get(0));
+
+ // Filtered message should not be federated so not added on the local
server widlcard subscription binding.
+ assertEquals(2, localQueue.getMessagesAdded());
+ } finally {
+ connectionL.close();
+ connectionR.close();
+
+ server.stop();
+ remoteServer.stop();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]