This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a2bb690e46 ARTEMIS-5953 Allow address federation bindings to group 
under a wildcard
a2bb690e46 is described below

commit a2bb690e468d7956a4c83f10efa65f09a6d66f45
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Mar 13 16:15:56 2026 -0400

    ARTEMIS-5953 Allow address federation bindings to group under a wildcard
    
    Add configuration that enables the address federation policy matches to 
define if
    a remote wildcard address will be used to host the address receiver 
bindings for
    the matching addresses if the match value is itself a wildcard. This can be 
useful
    if federating large numbers of addresses that have low traffic as it 
reduces the
    number of addresses that need to exist on the remote federation target at 
the
    expense of higher CPU load on the target in order to route messages to the
    appropriate wildcard bindings for the matched addresses. The default value 
will
    be `false` and the federation receiver address bindings will be attached to 
the
    matching addresses on the remote as they always have. When enabled the 
remote
    broker must have wildcard routing enabled for this feature to work 
correctly.
---
 .../AMQPFederationAddressBindingsConsumer.java     |   2 +-
 .../federation/AMQPFederationAddressConsumer.java  |  12 +-
 .../AMQPFederationAddressPolicyManager.java        | 183 +++++---
 .../federation/AMQPFederationConstants.java        |   8 +
 .../AMQPFederationConsumerControlType.java         |   4 +-
 .../AMQPFederationGenericConsumerInfo.java         |  69 ++-
 .../AMQPFederationManagementSupport.java           |  32 +-
 .../federation/AMQPFederationPolicySupport.java    |  47 +-
 .../federation/AMQPFederationQueueConsumer.java    |   9 +-
 .../AMQPFederationQueuePolicyManager.java          |   3 +-
 .../amqp/federation/FederationConsumerInfo.java    |  29 +-
 .../FederationReceiveFromAddressPolicy.java        |  61 ++-
 .../AMQPFederationPolicySupportTest.java           |  57 ++-
 .../AMQPFederationAddressPolicyElement.java        |  15 +-
 .../deployers/impl/FileConfigurationParser.java    |   2 +
 .../resources/schema/artemis-configuration.xsd     |   1 +
 .../AMQPFederationAddressPolicyElementTest.java    |   8 +
 .../core/config/impl/ConfigurationImplTest.java    |   3 +
 .../resources/ConfigurationTest-full-config.xml    |   4 +-
 .../amqp-federation-configuration-glossary.adoc    |   6 +
 .../connect/AMQPFederationAddressPolicyTest.java   | 511 ++++++++++++++++++++-
 .../amqp/connect/AMQPFederationConnectTest.java    |   3 +
 .../amqp/connect/AMQPFederationManagementTest.java |   3 +
 .../connect/AMQPFederationServerToServerTest.java  | 290 ++++++++++++
 24 files changed, 1175 insertions(+), 187 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressBindingsConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressBindingsConsumer.java
index 8499eaabf0..7abe3021c6 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressBindingsConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressBindingsConsumer.java
@@ -146,7 +146,7 @@ public final class AMQPFederationAddressBindingsConsumer 
extends AMQPFederationA
 
          delivery.setContext(message);
 
-         message.setAddress(consumerInfo.getAddress());
+         message.setAddress(consumerInfo.getTargetAddress());
          
message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
          if (message.getMessageID() <= 0) {
             message.setMessageID(storageManager.generateID());
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
index 483e9729cd..28602201be 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
@@ -96,7 +96,7 @@ public abstract class AMQPFederationAddressConsumer extends 
AMQPFederationConsum
       if (federation.getCapabilities().isUseFQQNAddressSubscriptions()) {
          return "federation-" + federation.getName() +
                 "-policy-" + policy.getPolicyName() +
-                "-address-receiver-" + consumerInfo.getAddress() +
+                "-address-receiver-" + consumerInfo.getTargetAddress() +
                 "-" + federation.getServer().getNodeID() +
                 "-" + LINK_SEQUENCE_ID.incrementAndGet();
       } else {
@@ -107,7 +107,7 @@ public abstract class AMQPFederationAddressConsumer extends 
AMQPFederationConsum
          // connection recovery which is arguably less broken than the 
sequence ID variant which
          // creates unstable subscription queues that can be orphaned or 
consumed out of order.
          return "federation-" + federation.getName() +
-                "-address-receiver-" + consumerInfo.getAddress() +
+                "-address-receiver-" + consumerInfo.getTargetAddress() +
                 "-" + federation.getServer().getNodeID();
       }
    }
@@ -146,12 +146,12 @@ public abstract class AMQPFederationAddressConsumer 
extends AMQPFederationConsum
          source.setFilter(filtersMap);
 
          if (federation.getCapabilities().isUseFQQNAddressSubscriptions()) {
-            source.setAddress(consumerInfo.getFqqn());
+            source.setAddress(consumerInfo.getSourceFqqn());
          } else {
-            source.setAddress(consumerInfo.getAddress()); // Legacy behavior
+            source.setAddress(consumerInfo.getTargetAddress()); // Legacy 
behavior where both sides would be the target value
          }
 
-         target.setAddress(consumerInfo.getAddress());
+         target.setAddress(consumerInfo.getTargetAddress());
 
          final Map<String, Object> addressSourceProperties = new HashMap<>();
          // If the remote needs to create the address then it should apply 
these
@@ -303,7 +303,7 @@ public abstract class AMQPFederationAddressConsumer extends 
AMQPFederationConsum
       AMQPFederatedAddressDeliveryHandler(AMQPSessionContext session, 
FederationConsumerInfo consumerInfo, Receiver receiver) {
          super(session.getSessionSPI(), session.getAMQPConnectionContext(), 
session, receiver);
 
-         this.cachedAddress = SimpleString.of(consumerInfo.getAddress());
+         this.cachedAddress = SimpleString.of(consumerInfo.getTargetAddress());
       }
 
       @Override
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
index 6703715492..88789c20b6 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
@@ -48,7 +48,6 @@ import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerIn
 import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
-import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -254,21 +253,10 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
                return;
             }
 
-            // Target brokers which have been sent remote federation policies 
might not have write
-            // access via the logged in user to the address with demand which 
we are attempting to
-            // federate messages to so instead of creating a receiver that 
will fail when the remote
-            // routes a message to it we can just omit creating the link in 
the first place.
-            if (federation.isFederationTarget()) {
-               try {
-                  session.getSessionSPI().check(addressInfo.getName(), 
CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
-               } catch (ActiveMQSecurityException e) {
-                  
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
-                     addressInfo.getName().toString(), "User does not have 
send permission to configured address.");
-                  return;
-               } catch (Exception ex) {
-                  logger.warn("Caught unknown exception from security check on 
address:{} send permissions: cannot federate:", addressInfo.getName(), ex);
-                  return;
-               }
+            // If unable to write to the given address then we can stop here 
and not create a remote
+            // receiver that will fail once a message is routed.
+            if (isAddressSecurityBlockingWrites(addressInfo.getName())) {
+               return;
             }
 
             createOrUpdateFederatedAddressConsumerForBinding(addressInfo, 
queueBinding);
@@ -298,6 +286,27 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
       return false;
    }
 
+   private boolean isAddressSecurityBlockingWrites(SimpleString address) {
+      // Target brokers which have been sent remote federation policies might 
not have write
+      // access via the logged in user to the address with demand which we are 
attempting to
+      // federate messages to so instead of creating a receiver that will fail 
when the remote
+      // routes a message to it we can just omit creating the link in the 
first place.
+      if (federation.isFederationTarget()) {
+         try {
+            session.getSessionSPI().check(address, CheckType.SEND, 
federation.getConnectionContext().getSecurityAuth());
+         } catch (ActiveMQSecurityException e) {
+            
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
+               address.toString(), "User does not have send permission to 
configured address.");
+            return true;
+         } catch (Exception ex) {
+            logger.warn("Caught unknown exception from security check on 
address:{} send permissions: cannot federate:", address, ex);
+            return true;
+         }
+      }
+
+      return false;
+   }
+
    private void reactIfAnyQueueBindingMatchesDivertTarget(DivertBinding 
divertBinding) {
       if (!policy.isEnableDivertBindings()) {
          return;
@@ -309,21 +318,9 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
          return;
       }
 
-      // Target brokers which have been sent remote federation policies might 
not have write access
-      // via the logged in user to the address this divert is attached to 
which means we don't need
-      // to track this divert. Since we don't add the divert to the tracking 
map future demand on
-      // divert that would otherwise match the address includes won't trigger 
federation attempts.
-      if (federation.isFederationTarget()) {
-         try {
-            session.getSessionSPI().check(addressInfo.getName(), 
CheckType.SEND, federation.getConnectionContext().getSecurityAuth());
-         } catch (ActiveMQSecurityException e) {
-            
ActiveMQAMQPProtocolLogger.LOGGER.federationTargetSkippedAddressFederation(
-               addressInfo.getName().toString(), "User does not have send 
permission to configured address.");
-            return;
-         } catch (Exception ex) {
-            logger.warn("Caught unknown exception from security check on 
address:{} send permissions: cannot federate:", addressInfo.getName(), ex);
-            return;
-         }
+      // Don't track the divert if we cannot write to the address with the 
divert
+      if (isAddressSecurityBlockingWrites(addressInfo.getName())) {
+         return;
       }
 
       // We only need to check if we've never seen the divert before, 
afterwards we will
@@ -486,6 +483,26 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
       return policy.test(address, type);
    }
 
+   /**
+    * Called when demand for an address has been determined to trigger need 
for a remote receiver to
+    * federate messages back to this peer and wildcard subscriptions has been 
enabled. The value returned
+    * is the first configured match in the policy. If there are multiple 
matchers in the policy that
+    * could match on this address only the first one located should be 
returned as having more than one
+    * positive matcher is a configuration issue the user is on the hook if 
this result in unexpected
+    * behaviors from the federation manager. The result could be an exact 
match to the address name or it
+    * could be a wildcard pattern match that captures this address in its 
scope.
+    *
+    * @param address
+    *    The address that has triggered demand and requires a new remote 
consumer.
+    * @param type
+    *    The routing type that the address was created with.
+    *
+    * @return the first matcher string from the configured address policy that 
matches on the target address
+    */
+   private String getMatcherForAddress(String address, RoutingType type) {
+      return policy.getFirstMatchingAddressPattern(address, type);
+   }
+
    private static boolean isAddressInDivertForwards(final SimpleString 
targetAddress, final SimpleString forwardAddress) {
       final SimpleString[] forwardAddresses = forwardAddress.split(',');
 
@@ -572,10 +589,13 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
          AMQPFederationAddressConsumerManager<?> consumerManager = 
registry.get(key);
 
          if (consumerManager == null) {
-            if (isUseConduitConsumer()) {
-               registry.put(key, consumerManager = new 
AMQPFederationAddressConduitConsumerManager(manager, 
createConsumerInfo(addressInfo, binding), addressInfo));
+            final boolean useConduitConsumer = isUseConduitConsumer();
+            final FederationConsumerInfo consumerInfo = 
createConsumerInfo(binding, useConduitConsumer);
+
+            if (useConduitConsumer) {
+               registry.put(key, consumerManager = new 
AMQPFederationAddressConduitConsumerManager(manager, consumerInfo, 
addressInfo));
             } else {
-               registry.put(key, consumerManager = new 
AMQPFederationAddressBindingsConsumerManager(manager, 
createConsumerInfo(addressInfo, binding), addressInfo));
+               registry.put(key, consumerManager = new 
AMQPFederationAddressBindingsConsumerManager(manager, consumerInfo, 
addressInfo));
             }
          }
 
@@ -619,37 +639,67 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
       }
 
       /**
-       * Create a new {@link FederationConsumerInfo} based on the given {@link 
AddressInfo} and the configured
-       * {@link FederationReceiveFromAddressPolicy}.
+       * Create a new {@link FederationConsumerInfo} based on the given data 
from this address consumer
+       * registry and the binding that generated the demand that triggered 
creation of a consumer.
        *
-       * @param address
-       *       The {@link AddressInfo} to use as a basis for the consumer 
information object.
        * @param binding
        *       The {@link Binding} that is the source of demand for this 
consumer info object
+       * @param conduitConsumer
+       *       Is the consumer being created a conduit consumer or a direct to 
bindings consumer.
        *
        * @return a new {@link FederationConsumerInfo} instance based on the 
given address
        */
-      private FederationConsumerInfo createConsumerInfo(AddressInfo address, 
Binding binding) {
-         final String addressName = address.getName().toString();
-         final boolean ignoreBindingFilters = isUseConduitConsumer() || 
binding.getFilter() == null;
-         final String generatedQueueName = generateQueueName(address, binding, 
ignoreBindingFilters);
-         final String consumerFilter;
-
-         if (ignoreBindingFilters) {
-            consumerFilter = baseConsumerFilter;
-         } else if (baseConsumerFilter != null) {
-            consumerFilter = "(" + binding.getFilter().getFilterString() + ") 
AND " + baseConsumerFilter;
+      private FederationConsumerInfo createConsumerInfo(Binding binding, 
boolean conduitConsumer) {
+         final boolean allowWildcardGroupings = isAllowWildcardGroupings();
+         final String targetAddress = addressInfo.getName().toString();
+         final String sourceAddress;
+         final String sourceFilter;
+
+         if (allowWildcardGroupings) {
+            final String matcher = 
manager.getMatcherForAddress(addressInfo.getName().toString(), 
addressInfo.getRoutingType());
+
+            if (manager.getWildcardConfiguration().isWild(matcher)) {
+               sourceAddress = matcher;
+               sourceFilter = "AMQAddress='" + targetAddress + "'";
+            } else {
+               sourceAddress = targetAddress;
+               sourceFilter = null;
+            }
          } else {
-            consumerFilter = binding.getFilter().getFilterString().toString();
+            sourceAddress = targetAddress;
+            sourceFilter = null;
          }
 
+         final boolean ignoreBindingFilters = conduitConsumer || 
binding.getFilter() == null;
+         final CharSequence bindingFilter = ignoreBindingFilters ? null : 
binding.getFilter().getFilterString();
+         final String generatedQueueName = generateQueueName(binding, 
ignoreBindingFilters);
+         final StringBuilder consumerFilter = new StringBuilder();
+
+         appendFilter(consumerFilter, sourceFilter, false);       // Per 
address filter for wildcard groups
+         appendFilter(consumerFilter, baseConsumerFilter, false); // Base 
federation filter for maxHops etc
+         appendFilter(consumerFilter, bindingFilter, true);       // Bindings 
filter if supporting per address filters
+
          return new 
AMQPFederationGenericConsumerInfo(FederationConsumerInfo.Role.ADDRESS_CONSUMER,
-            addressName,
-            generatedQueueName,
-            address.getRoutingType(),
-            consumerFilter,
-            CompositeAddress.toFullyQualified(addressName, generatedQueueName),
-            ActiveMQDefaultConfiguration.getDefaultConsumerPriority());
+                                                      sourceAddress,
+                                                      targetAddress,
+                                                      generatedQueueName,
+                                                      
addressInfo.getRoutingType(),
+                                                      consumerFilter.isEmpty() 
? null : consumerFilter.toString(),
+                                                      
ActiveMQDefaultConfiguration.getDefaultConsumerPriority());
+      }
+
+      private static void appendFilter(StringBuilder builder, CharSequence 
filter, boolean parenthesesIfNeeded) {
+         if (filter != null) {
+            if (!builder.isEmpty()) {
+               builder.append(" AND ");
+            }
+
+            if (parenthesesIfNeeded && !builder.isEmpty()) {
+               builder.append("(").append(filter).append(")");
+            } else {
+               builder.append(filter);
+            }
+         }
       }
 
       private boolean isUseConduitConsumer() {
@@ -671,16 +721,33 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
          }
       }
 
-      private String generateQueueName(AddressInfo address, Binding binding, 
boolean ignoreFilters) {
+      /*
+       * Should we allow the existing remote receivers to be bound to a 
wildcard address instead of an address
+       * that matches the target? We cannot if no FQQN but otherwise we can so 
long as we create a filter to
+       * account for the messages that will arrive at the wildcard root 
address that don't match the local target
+       * via a filter for the AMQAddress value matching the receiver target 
address..
+       */
+      private boolean isAllowWildcardGroupings() {
+         if (!manager.getCapabilities().isUseFQQNAddressSubscriptions()) {
+            // prior to FQQN subscription support we used a simple link name 
that would not be unique amongst
+            // multiple consumers on the same address so for most features or 
behaviors that come later we cannot
+            // action them properly and we must fallback to a conduit consumer 
strategy.
+            return false;
+         } else {
+            return manager.getPolicy().isAllowWildcardGroupings();
+         }
+      }
+
+      private String generateQueueName(Binding binding, boolean ignoreFilters) 
{
          if (ignoreFilters) {
             return "federation." + manager.getFederation().getName() +
                    ".policy." + manager.getPolicyName() +
-                   ".address." + address.getName() +
+                   ".address." + addressInfo.getName() +
                    ".node." + manager.server.getNodeID();
          } else {
             return "federation." + manager.getFederation().getName() +
                    ".policy." + manager.getPolicyName() +
-                   ".address." + address.getName() +
+                   ".address." + addressInfo.getName() +
                    ".filterId." + 
generateFilterId(binding.getFilter().getFilterString().toString()) +
                    ".node." + manager.server.getNodeID();
          }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
index 38d0745517..e5cc4f9c10 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
@@ -298,6 +298,14 @@ public final class AMQPFederationConstants {
     */
    public static final String ADDRESS_ENABLE_DIVERT_BINDINGS = 
"enable-divert-bindings";
 
+   /**
+    * Encodes a boolean value that controls if the address federation should 
check the matcher value in the
+    * policy and if its a wildcard, create a consumer that indicates its 
source is the wildcard address of
+    * the matcher and place the normal address binding under that wildcard but 
add a filter to prevent any
+    * messages other than those of its target address from being routed to it.
+    */
+   public static final String ADDRESS_ALLOW_WILDCARD_GROUPINGS = 
"allow-wildcard-groupings";
+
    /**
     * Encodes a {@link Map} of String keys and values that are carried along 
in the federation policy (address or
     * queue). These values can be used to add extended configuration to the 
policy object such as overriding settings
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerControlType.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerControlType.java
index e69a311afd..45d4393828 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerControlType.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerControlType.java
@@ -86,7 +86,7 @@ public class AMQPFederationConsumerControlType extends 
AbstractControl implement
       }
       clearIO();
       try {
-         return consumerInfo.getAddress();
+         return consumerInfo.getTargetAddress();
       } finally {
          blockOnIO();
       }
@@ -99,7 +99,7 @@ public class AMQPFederationConsumerControlType extends 
AbstractControl implement
       }
       clearIO();
       try {
-         return consumerInfo.getFqqn();
+         return consumerInfo.getTargetFqqn();
       } finally {
          blockOnIO();
       }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationGenericConsumerInfo.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationGenericConsumerInfo.java
index f9627e1d30..bee2789bb9 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationGenericConsumerInfo.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationGenericConsumerInfo.java
@@ -20,11 +20,8 @@ package 
org.apache.activemq.artemis.protocol.amqp.connect.federation;
 import java.util.Objects;
 import java.util.UUID;
 
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
 import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
-import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 
 /**
@@ -37,48 +34,30 @@ public class AMQPFederationGenericConsumerInfo implements 
FederationConsumerInfo
    public static final String QUEUE_NAME_FORMAT_STRING = 
"${address}::${routeType}";
 
    private final Role role;
-   private final String address;
+   private final String sourceAddress;
+   private final String sourceFqqn;
+   private final String targetAddress;
+   private final String targetFqqn;
    private final String queueName;
    private final RoutingType routingType;
    private final String filterString;
-   private final String fqqn;
    private final int priority;
    private final String id;
 
-   public AMQPFederationGenericConsumerInfo(Role role, String address, String 
queueName, RoutingType routingType,
-                                        String filterString, String fqqn, int 
priority) {
+   public AMQPFederationGenericConsumerInfo(Role role, String sourceAddress, 
String targetAddress, String queueName,
+                                            RoutingType routingType, String 
filterString, int priority) {
       this.role = role;
-      this.address = address;
+      this.sourceAddress = sourceAddress;
+      this.targetAddress = targetAddress;
       this.queueName = queueName;
       this.routingType = routingType;
       this.filterString = filterString;
-      this.fqqn = fqqn;
+      this.sourceFqqn = CompositeAddress.toFullyQualified(sourceAddress, 
queueName);
+      this.targetFqqn = CompositeAddress.toFullyQualified(targetAddress, 
queueName);
       this.priority = priority;
       this.id = UUID.randomUUID().toString();
    }
 
-   /**
-    * Factory for creating federation address consumer information objects 
from server resources.
-    *
-    * @param address      The address being federated, the remote consumer 
will be created under this address.
-    * @param queueName    The name of the remote queue that will be created in 
order to route messages here.
-    * @param routingType  The routing type to assign the remote consumer.
-    * @param filterString A filter string used by the federation instance to 
limit what enters the remote queue.
-    * @param federation   The parent {@link Federation} that this federation 
consumer is created for
-    * @param policy       The {@link FederationReceiveFromAddressPolicy} that 
triggered this information object to be
-    *                     created.
-    * @return a newly created and configured {@link FederationConsumerInfo} 
instance
-    */
-   public static AMQPFederationGenericConsumerInfo build(String address, 
String queueName, RoutingType routingType, String filterString, Federation 
federation, FederationReceiveFromAddressPolicy policy) {
-      return new AMQPFederationGenericConsumerInfo(Role.ADDRESS_CONSUMER,
-                                               address,
-                                               queueName,
-                                               routingType,
-                                               filterString,
-                                               
CompositeAddress.toFullyQualified(address, queueName),
-                                               
ActiveMQDefaultConfiguration.getDefaultConsumerPriority());
-   }
-
    @Override
    public String getId() {
       return id;
@@ -95,13 +74,23 @@ public class AMQPFederationGenericConsumerInfo implements 
FederationConsumerInfo
    }
 
    @Override
-   public String getAddress() {
-      return address;
+   public String getSourceAddress() {
+      return sourceAddress;
+   }
+
+   @Override
+   public String getTargetAddress() {
+      return targetAddress;
+   }
+
+   @Override
+   public String getSourceFqqn() {
+      return sourceFqqn;
    }
 
    @Override
-   public String getFqqn() {
-      return fqqn;
+   public String getTargetFqqn() {
+      return targetFqqn;
    }
 
    @Override
@@ -130,20 +119,22 @@ public class AMQPFederationGenericConsumerInfo implements 
FederationConsumerInfo
 
       return role == other.role &&
              priority == other.priority &&
-             Objects.equals(address, other.address) &&
+             Objects.equals(sourceAddress, other.sourceAddress) &&
+             Objects.equals(targetAddress, other.targetAddress) &&
              Objects.equals(queueName, other.queueName) &&
              routingType == other.routingType &&
              Objects.equals(filterString, other.filterString) &&
-             Objects.equals(fqqn, other.fqqn);
+             Objects.equals(sourceFqqn, other.sourceFqqn) &&
+             Objects.equals(targetFqqn, other.targetFqqn);
    }
 
    @Override
    public int hashCode() {
-      return Objects.hash(role, address, queueName, routingType, filterString, 
fqqn, priority);
+      return Objects.hash(role, sourceAddress, targetAddress, queueName, 
routingType, filterString, sourceFqqn, targetFqqn, priority);
    }
 
    @Override
    public String toString() {
-      return "FederationConsumerInfo: { " + getRole() + ", " + getFqqn() + "}";
+      return "FederationConsumerInfo: { " + getRole() + ", " + getSourceFqqn() 
+ "}";
    }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java
index c42f9715c0..fd54fa9502 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java
@@ -413,13 +413,13 @@ public abstract class AMQPFederationManagementSupport {
       final String policyName = manager.getPolicyName();
 
       if (consumer.getRole() == FederationConsumerInfo.Role.ADDRESS_CONSUMER) {
-         
management.registerUntypedControl(getFederationSourceAddressConsumerResourceName(brokerConnectionName,
 federationName, policyName, consumer.getConsumerInfo().getAddress()),
+         
management.registerUntypedControl(getFederationSourceAddressConsumerResourceName(brokerConnectionName,
 federationName, policyName, consumer.getConsumerInfo().getTargetAddress()),
                                            control,
-                                           
getFederationSourceAddressConsumerObjectName(management, brokerConnectionName, 
federationName, manager.getPolicyType().toString(), policyName, 
consumer.getConsumerInfo().getAddress()));
+                                           
getFederationSourceAddressConsumerObjectName(management, brokerConnectionName, 
federationName, manager.getPolicyType().toString(), policyName, 
consumer.getConsumerInfo().getTargetAddress()));
       } else {
-         
management.registerUntypedControl(getFederationSourceQueueConsumerResourceName(brokerConnectionName,
 federationName, policyName, consumer.getConsumerInfo().getFqqn()),
+         
management.registerUntypedControl(getFederationSourceQueueConsumerResourceName(brokerConnectionName,
 federationName, policyName, consumer.getConsumerInfo().getTargetFqqn()),
                                            control,
-                                           
getFederationSourceQueueConsumerObjectName(management, brokerConnectionName, 
federationName, manager.getPolicyType().toString(), policyName, 
consumer.getConsumerInfo().getFqqn()));
+                                           
getFederationSourceQueueConsumerObjectName(management, brokerConnectionName, 
federationName, manager.getPolicyType().toString(), policyName, 
consumer.getConsumerInfo().getTargetFqqn()));
       }
    }
 
@@ -439,11 +439,11 @@ public abstract class AMQPFederationManagementSupport {
       final String policyName = manager.getPolicyName();
 
       if (consumer.getRole() == FederationConsumerInfo.Role.ADDRESS_CONSUMER) {
-         
management.unregisterUntypedControl(getFederationSourceAddressConsumerResourceName(brokerConnectionName,
 federationName, policyName, consumer.getConsumerInfo().getAddress()),
-                                             
getFederationSourceAddressConsumerObjectName(management, brokerConnectionName, 
federationName, manager.getPolicyType().toString(), policyName, 
consumer.getConsumerInfo().getAddress()));
+         
management.unregisterUntypedControl(getFederationSourceAddressConsumerResourceName(brokerConnectionName,
 federationName, policyName, consumer.getConsumerInfo().getTargetAddress()),
+                                             
getFederationSourceAddressConsumerObjectName(management, brokerConnectionName, 
federationName, manager.getPolicyType().toString(), policyName, 
consumer.getConsumerInfo().getTargetAddress()));
       } else {
-         
management.unregisterUntypedControl(getFederationSourceQueueConsumerResourceName(brokerConnectionName,
 federationName, policyName, consumer.getConsumerInfo().getFqqn()),
-                                             
getFederationSourceQueueConsumerObjectName(management, brokerConnectionName, 
federationName, manager.getPolicyType().toString(), policyName, 
consumer.getConsumerInfo().getFqqn()));
+         
management.unregisterUntypedControl(getFederationSourceQueueConsumerResourceName(brokerConnectionName,
 federationName, policyName, consumer.getConsumerInfo().getTargetFqqn()),
+                                             
getFederationSourceQueueConsumerObjectName(management, brokerConnectionName, 
federationName, manager.getPolicyType().toString(), policyName, 
consumer.getConsumerInfo().getTargetFqqn()));
       }
    }
 
@@ -575,13 +575,13 @@ public abstract class AMQPFederationManagementSupport {
       final String policyName = manager.getPolicyName();
 
       if (consumer.getRole() == FederationConsumerInfo.Role.ADDRESS_CONSUMER) {
-         
management.registerUntypedControl(getFederationTargetAddressConsumerResourceName(remoteNodeId,
 brokerConnectionName, federationName, policyName, 
consumer.getConsumerInfo().getAddress()),
+         
management.registerUntypedControl(getFederationTargetAddressConsumerResourceName(remoteNodeId,
 brokerConnectionName, federationName, policyName, 
consumer.getConsumerInfo().getTargetAddress()),
                                            control,
-                                           
getFederationTargetAddressConsumerObjectName(management, remoteNodeId, 
brokerConnectionName, federationName, manager.getPolicyType().toString(), 
policyName, consumer.getConsumerInfo().getAddress()));
+                                           
getFederationTargetAddressConsumerObjectName(management, remoteNodeId, 
brokerConnectionName, federationName, manager.getPolicyType().toString(), 
policyName, consumer.getConsumerInfo().getTargetAddress()));
       } else {
-         
management.registerUntypedControl(getFederationTargetQueueConsumerResourceName(remoteNodeId,
 brokerConnectionName, federationName, policyName, 
consumer.getConsumerInfo().getFqqn()),
+         
management.registerUntypedControl(getFederationTargetQueueConsumerResourceName(remoteNodeId,
 brokerConnectionName, federationName, policyName, 
consumer.getConsumerInfo().getTargetFqqn()),
                                            control,
-                                           
getFederationTargetQueueConsumerObjectName(management, remoteNodeId, 
brokerConnectionName, federationName, manager.getPolicyType().toString(), 
policyName, consumer.getConsumerInfo().getFqqn()));
+                                           
getFederationTargetQueueConsumerObjectName(management, remoteNodeId, 
brokerConnectionName, federationName, manager.getPolicyType().toString(), 
policyName, consumer.getConsumerInfo().getTargetFqqn()));
       }
    }
 
@@ -601,11 +601,11 @@ public abstract class AMQPFederationManagementSupport {
       final String policyName = manager.getPolicyName();
 
       if (consumer.getRole() == FederationConsumerInfo.Role.ADDRESS_CONSUMER) {
-         
management.unregisterUntypedControl(getFederationTargetAddressConsumerResourceName(remoteNodeId,
 brokerConnectionName, federationName, policyName, 
consumer.getConsumerInfo().getAddress()),
-                                             
getFederationTargetAddressConsumerObjectName(management, remoteNodeId, 
brokerConnectionName, federationName, manager.getPolicyType().toString(), 
policyName, consumer.getConsumerInfo().getAddress()));
+         
management.unregisterUntypedControl(getFederationTargetAddressConsumerResourceName(remoteNodeId,
 brokerConnectionName, federationName, policyName, 
consumer.getConsumerInfo().getTargetAddress()),
+                                             
getFederationTargetAddressConsumerObjectName(management, remoteNodeId, 
brokerConnectionName, federationName, manager.getPolicyType().toString(), 
policyName, consumer.getConsumerInfo().getTargetAddress()));
       } else {
-         
management.unregisterUntypedControl(getFederationTargetQueueConsumerResourceName(remoteNodeId,
 brokerConnectionName, federationName, policyName, 
consumer.getConsumerInfo().getFqqn()),
-                                             
getFederationTargetQueueConsumerObjectName(management, remoteNodeId, 
brokerConnectionName, federationName, manager.getPolicyType().toString(), 
policyName, consumer.getConsumerInfo().getFqqn()));
+         
management.unregisterUntypedControl(getFederationTargetQueueConsumerResourceName(remoteNodeId,
 brokerConnectionName, federationName, policyName, 
consumer.getConsumerInfo().getTargetFqqn()),
+                                             
getFederationTargetQueueConsumerObjectName(management, remoteNodeId, 
brokerConnectionName, federationName, manager.getPolicyType().toString(), 
policyName, consumer.getConsumerInfo().getTargetFqqn()));
       }
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
index 1bf0494ea5..7212cfdd9a 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupport.java
@@ -21,6 +21,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
@@ -105,6 +106,7 @@ public final class AMQPFederationPolicySupport {
     * is created as these values are used to indicate no max hops for 
federated messages on an address.
     *
     * @param maxHops The max allowed number of hops before a message should 
stop crossing federation links.
+    *
     * @return the address filter string that should be applied (or null)
     */
    public static String generateAddressFilter(int maxHops) {
@@ -125,6 +127,7 @@ public final class AMQPFederationPolicySupport {
     * {@link FederationReceiveFromQueuePolicy}.
     *
     * @param policy The policy to encode into an AMQP message.
+    *
     * @return an AMQP Message with the encoded policy
     */
    public static AMQPMessage 
encodeQueuePolicyControlMessage(FederationReceiveFromQueuePolicy policy) {
@@ -194,6 +197,7 @@ public final class AMQPFederationPolicySupport {
     * {@link FederationReceiveFromAddressPolicy}.
     *
     * @param policy The policy to encode into an AMQP message.
+    *
     * @return an AMQP Message with the encoded policy
     */
    public static AMQPMessage 
encodeAddressPolicyControlMessage(FederationReceiveFromAddressPolicy policy) {
@@ -211,6 +215,7 @@ public final class AMQPFederationPolicySupport {
       policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 
policy.getAutoDeleteMessageCount());
       policyMap.put(ADDRESS_MAX_HOPS, policy.getMaxHops());
       policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, 
policy.isEnableDivertBindings());
+      policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, 
policy.isAllowWildcardGroupings());
       if (!policy.getIncludes().isEmpty()) {
          policyMap.put(ADDRESS_INCLUDES, new 
ArrayList<>(policy.getIncludes()));
       }
@@ -250,11 +255,16 @@ public final class AMQPFederationPolicySupport {
    /**
     * Given an AMQP Message decode an {@link FederationReceiveFromQueuePolicy} 
from it and return the decoded value. The
     * message should have already been inspected and determined to be an 
control message of the add to policy type.
+    * The decoder ignores elements in the message that it does not know about 
so that future configuration additions do
+    * not break previous broker instances.
+    *
+    * @param message
+    *    The {@link AMQPMessage} that should carry an encoded {@link 
FederationReceiveFromQueuePolicy}
+    * @param wildcardConfig
+    *    The {@link WildcardConfiguration} to use in the decoded policy.
     *
-    * @param message        The {@link AMQPMessage} that should carry an 
encoded
-    *                       {@link FederationReceiveFromQueuePolicy}
-    * @param wildcardConfig The {@link WildcardConfiguration} to use in the 
decoded policy.
     * @return a decoded {@link FederationReceiveFromQueuePolicy} instance
+    *
     * @throws ActiveMQException if an error occurs while decoding the policy.
     */
    @SuppressWarnings("unchecked")
@@ -359,11 +369,16 @@ public final class AMQPFederationPolicySupport {
    /**
     * Given an AMQP Message decode an {@link 
FederationReceiveFromAddressPolicy} from it and return the decoded value.
     * The message should have already been inspected and determined to be an 
control message of the add to policy type.
+    * The decoder ignores elements in the message that it does not know about 
so that future configuration additions do
+    * not break previous broker instances.
+    *
+    * @param message
+    *    The {@link AMQPMessage} that should carry an encoded {@link 
FederationReceiveFromQueuePolicy}
+    * @param wildcardConfig
+    *    The {@link WildcardConfiguration} to use in the decoded policy.
     *
-    * @param message        The {@link AMQPMessage} that should carry an 
encoded
-    *                       {@link FederationReceiveFromQueuePolicy}
-    * @param wildcardConfig The {@link WildcardConfiguration} to use in the 
decoded policy.
     * @return a decoded {@link FederationReceiveFromAddressPolicy} instance
+    *
     * @throws ActiveMQException if an error occurs during the policy decode.
     */
    @SuppressWarnings("unchecked")
@@ -399,6 +414,7 @@ public final class AMQPFederationPolicySupport {
          final long autoDeleteMsgCount = ((Number) 
policyMap.getOrDefault(ADDRESS_AUTO_DELETE_MSG_COUNT, 0L)).longValue();
          final int maxHops = ((Number) 
policyMap.get(ADDRESS_MAX_HOPS)).intValue();
          final boolean enableDiverts = (Boolean) 
policyMap.getOrDefault(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+         final boolean allowWildcardGroupings = (Boolean) 
policyMap.getOrDefault(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
 
          final Set<String> includes;
          final Set<String> excludes;
@@ -435,8 +451,8 @@ public final class AMQPFederationPolicySupport {
 
          return new FederationReceiveFromAddressPolicy(policyName, autoDelete, 
autoDeleteDelay,
                                                        autoDeleteMsgCount, 
maxHops, enableDiverts,
-                                                       includes, excludes, 
properties, transformerConfig,
-                                                       wildcardConfig);
+                                                       allowWildcardGroupings, 
includes, excludes,
+                                                       properties, 
transformerConfig, wildcardConfig);
       } catch (Exception e) {
          throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.malformedFederationControlMessage(
             "Invalid encoded address policy entry: " + e.getMessage());
@@ -447,8 +463,11 @@ public final class AMQPFederationPolicySupport {
     * From the broker AMQP broker connection configuration element and the 
configured wild-card settings create an
     * address match policy.
     *
-    * @param element   The broker connections element configuration that 
creates this policy.
-    * @param wildcards The configured wild-card settings for the broker or 
defaults.
+    * @param element
+    *    The broker connections configuration that creates this federation 
address policy.
+    * @param wildcards
+    *    The configured wild-card settings for the broker or defaults.
+    *
     * @return a new address match and handling policy for use in the broker 
connection
     */
    public static FederationReceiveFromAddressPolicy 
create(AMQPFederationAddressPolicyElement element, WildcardConfiguration 
wildcards) {
@@ -481,6 +500,7 @@ public final class AMQPFederationPolicySupport {
          Objects.requireNonNullElse(element.getAutoDeleteMessageCount(), 0L),
          element.getMaxHops(),
          Objects.requireNonNullElse(element.isEnableDivertBindings(), false),
+         Objects.requireNonNullElse(element.isAllowWildcardGroupings(), false),
          includes,
          excludes,
          element.getProperties(),
@@ -496,8 +516,11 @@ public final class AMQPFederationPolicySupport {
     * in order to attempt to prevent federation consumers from consuming 
messages on the remote when a local consumer is
     * present.
     *
-    * @param element   The broker connections element configuration that 
creates this policy.
-    * @param wildcards The configured wild-card settings for the broker or 
defaults.
+    * @param element
+    *    The broker connections configuration that creates this federation 
queue policy.
+    * @param wildcards
+    *    The configured wild-card settings for the broker or defaults.
+    *
     * @return a new queue match and handling policy for use in the broker 
connection
     */
    public static FederationReceiveFromQueuePolicy 
create(AMQPFederationQueuePolicyElement element, WildcardConfiguration 
wildcards) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
index b80b0d218e..3f796de9c9 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
@@ -87,7 +87,7 @@ public final class AMQPFederationQueueConsumer extends 
AMQPFederationConsumer {
    private String generateLinkName() {
       return "federation-" + federation.getName() +
              "-policy-" + policy.getPolicyName() +
-             "-queue-receiver-" + consumerInfo.getFqqn() +
+             "-queue-receiver-" + consumerInfo.getTargetFqqn() +
              "-" + federation.getServer().getNodeID() + ":" +
              LINK_SEQUENCE_ID.getAndIncrement();
    }
@@ -103,7 +103,6 @@ public final class AMQPFederationQueueConsumer extends 
AMQPFederationConsumer {
          final Receiver protonReceiver = 
session.getSession().receiver(generateLinkName());
          final Target target = new Target();
          final Source source = new Source();
-         final String address = consumerInfo.getFqqn();
          final Queue localQueue = 
federation.getServer().locateQueue(consumerInfo.getQueueName());
 
          if (RoutingType.ANYCAST.equals(consumerInfo.getRoutingType())) {
@@ -116,7 +115,7 @@ public final class AMQPFederationQueueConsumer extends 
AMQPFederationConsumer {
          source.setDefaultOutcome(DEFAULT_OUTCOME);
          source.setDurable(TerminusDurability.NONE);
          source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
-         source.setAddress(address);
+         source.setAddress(consumerInfo.getSourceFqqn());
 
          if (consumerInfo.getFilterString() != null && 
!consumerInfo.getFilterString().isEmpty()) {
             final AmqpJmsSelectorFilter jmsFilter = new 
AmqpJmsSelectorFilter(consumerInfo.getFilterString());
@@ -126,7 +125,7 @@ public final class AMQPFederationQueueConsumer extends 
AMQPFederationConsumer {
             source.setFilter(filtersMap);
          }
 
-         target.setAddress(address);
+         target.setAddress(consumerInfo.getTargetFqqn());
 
          final Map<Symbol, Object> receiverProperties = new HashMap<>();
          receiverProperties.put(FEDERATION_RECEIVER_PRIORITY, 
consumerInfo.getPriority());
@@ -248,7 +247,7 @@ public final class AMQPFederationQueueConsumer extends 
AMQPFederationConsumer {
          super(session.getSessionSPI(), session.getAMQPConnectionContext(), 
session, receiver);
 
          this.localQueue = localQueue;
-         this.cachedFqqn = SimpleString.of(consumerInfo.getFqqn());
+         this.cachedFqqn = SimpleString.of(consumerInfo.getTargetFqqn());
       }
 
       @Override
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
index 041e75a114..5aeb8b137a 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
@@ -45,7 +45,6 @@ import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFro
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
-import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -292,11 +291,11 @@ public final class AMQPFederationQueuePolicyManager 
extends AMQPFederationLocalP
       final String filterString = selectFilter(consumer);
 
       return new AMQPFederationGenericConsumerInfo(Role.QUEUE_CONSUMER,
+                                                   address, // Source and 
target address are the same for Queue consumers
                                                    address,
                                                    queueName,
                                                    queue.getRoutingType(),
                                                    filterString,
-                                                   
CompositeAddress.toFullyQualified(address, queueName),
                                                    priority);
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java
index 87f37560ea..69af9ac475 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationConsumerInfo.java
@@ -59,21 +59,38 @@ public interface FederationConsumerInfo {
    String getQueueName();
 
    /**
-    * Gets the address that will be used for this federation consumer instance.
+    * Gets the address that will be the source of messages for the federation 
consumer instance.
     * <p>
     * For Queue federation this is the address under which the matching queue 
must reside. For Address federation this
-    * is the actual address whose messages are being federated.
+    * is the actual address whose messages are being federated from on the 
remote broker..
     *
-    * @return the address associated with this federation consumer
+    * @return the source address associated with this federation consumer
     */
-   String getAddress();
+   String getSourceAddress();
+
+   /**
+    * Gets the address that will be the target of messages for the federation 
consumer instance.
+    * <p>
+    * For Queue federation this is the address under which the matching queue 
must reside. For Address federation this
+    * is the actual address whose messages are being federated to on the local 
broker.
+    *
+    * @return the target address associated with this federation consumer
+    */
+   String getTargetAddress();
 
    /**
     * Gets the FQQN that comprises the address and queue where the remote 
consumer will be attached.
     *
-    * @return provides the FQQN that can be used to address the consumer queue 
directly
+    * @return provides the FQQN that can be used to address the remote 
consumer queue directly
+    */
+   String getSourceFqqn();
+
+   /**
+    * Gets the FQQN that comprises the address and queue where the local 
sender will be attached..
+    *
+    * @return provides the FQQN that can be used to address the local sender 
queue directly
     */
-   String getFqqn();
+   String getTargetFqqn();
 
    /**
     * Gets the routing type that will be requested when creating a consumer on 
the remote server.
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationReceiveFromAddressPolicy.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationReceiveFromAddressPolicy.java
index 8620b67610..9c69395d68 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationReceiveFromAddressPolicy.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/federation/FederationReceiveFromAddressPolicy.java
@@ -52,11 +52,12 @@ public class FederationReceiveFromAddressPolicy implements 
FederationReceiveFrom
    private final long autoDeleteMessageCount;
    private final int maxHops;
    private final boolean enableDivertBindings;
+   private final boolean allowWildcardGroupings;
    private final Map<String, Object> properties;
    private final TransformerConfiguration transformerConfig;
 
-   public FederationReceiveFromAddressPolicy(String name, boolean autoDelete, 
long autoDeleteDelay,
-                                             long autoDeleteMessageCount, int 
maxHops, boolean enableDivertBindings,
+   public FederationReceiveFromAddressPolicy(String name, boolean autoDelete, 
long autoDeleteDelay, long autoDeleteMessageCount,
+                                             int maxHops, boolean 
enableDivertBindings, boolean allowWildcardGroupings,
                                              Collection<String> 
includeAddresses, Collection<String> excludeAddresses,
                                              Map<String, Object> properties, 
TransformerConfiguration transformerConfig,
                                              WildcardConfiguration 
wildcardConfig) {
@@ -69,6 +70,7 @@ public class FederationReceiveFromAddressPolicy implements 
FederationReceiveFrom
       this.autoDeleteMessageCount = autoDeleteMessageCount;
       this.maxHops = maxHops;
       this.enableDivertBindings = enableDivertBindings;
+      this.allowWildcardGroupings = allowWildcardGroupings;
       this.transformerConfig = transformerConfig;
       this.includes = 
Collections.unmodifiableCollection(Objects.requireNonNullElse(includeAddresses, 
Collections.emptyList()));
       this.excludes = 
Collections.unmodifiableCollection(Objects.requireNonNullElse(excludeAddresses, 
Collections.emptyList()));
@@ -114,6 +116,10 @@ public class FederationReceiveFromAddressPolicy implements 
FederationReceiveFrom
       return enableDivertBindings;
    }
 
+   public boolean isAllowWildcardGroupings() {
+      return allowWildcardGroupings;
+   }
+
    public Collection<String> getIncludes() {
       return includes;
    }
@@ -137,6 +143,7 @@ public class FederationReceiveFromAddressPolicy implements 
FederationReceiveFrom
     * {@link SimpleString} object or any null checks.
     *
     * @param addressInfo The address info to check which if null will result 
in a negative result.
+    *
     * @return {@code true} if the address value matches this configured policy
     */
    public boolean test(AddressInfo addressInfo) {
@@ -147,6 +154,18 @@ public class FederationReceiveFromAddressPolicy implements 
FederationReceiveFrom
       }
    }
 
+   /**
+    * Test method that accepts the raw address name and its routing type and 
checks the set of
+    * includes and excludes to determine if the address matches any configured 
matchers in this
+    * policy.
+    *
+    * @param address
+    *       The address being tested for a match against this policy.
+    * @param type
+    *       The routing type associated with the address under test.
+    *
+    * @return <code>true</code> if the address matches this policies 
configuration.
+    */
    @Override
    public boolean test(String address, RoutingType type) {
       if (RoutingType.MULTICAST.equals(type)) {
@@ -166,18 +185,56 @@ public class FederationReceiveFromAddressPolicy 
implements FederationReceiveFrom
       return false;
    }
 
+   /**
+    * Returns the first matcher string that returns a positive match with the 
given address.
+    * Excludes are checked in order to prevent an inconsistency between what 
the test methods
+    * return and what this method returns. If test would have failed this 
method will also fail
+    * to match and will return a <code>null</code> string.
+    *
+    * @param address
+    *       The address being tested for a match against this policy.
+    * @param type
+    *       The routing type associated with the address under test.
+    *
+    * @return the resulting first matcher to match on the given address.
+    */
+   public String getFirstMatchingAddressPattern(String address, RoutingType 
type) {
+      if (RoutingType.MULTICAST.equals(type)) {
+         for (AddressMatcher matcher : excludesMatchers) {
+            if (matcher.test(address)) {
+               return null;
+            }
+         }
+
+         for (AddressMatcher matcher : includesMatchers) {
+            if (matcher.test(address)) {
+               return matcher.getMatcherPattern();
+            }
+         }
+      }
+
+      return null;
+   }
+
    private static class AddressMatcher implements Predicate<String> {
 
+      private String matchPattern;
       private final Predicate<String> matcher;
 
       AddressMatcher(String address, WildcardConfiguration wildcardConfig) {
          if (address == null || address.isEmpty()) {
+            matchPattern = wildcardConfig.getAnyWordsString();
             matcher = (target) -> true;
          } else {
+            matchPattern = address;
             matcher = new Match<>(address, null, 
wildcardConfig).getPattern().asPredicate();
          }
       }
 
+      public String getMatcherPattern() {
+         return matchPattern;
+      }
+
       @Override
       public boolean test(String address) {
          return matcher.test(address);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupportTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupportTest.java
index 1868c0de5e..91e13b82bd 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupportTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationPolicySupportTest.java
@@ -57,6 +57,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
@@ -224,13 +225,13 @@ public class AMQPFederationPolicySupportTest {
       properties2.put("amqpCredits", 10);
       properties2.put("amqpLowCredits", 3);
 
-      doTestEncodeReceiveFromAddressPolicy("test", false, 0, 1, 2, true, 
includes, excludes, properties1);
-      doTestEncodeReceiveFromAddressPolicy("test", true, 1, 3, 2, false, 
includes, excludes, null);
-      doTestEncodeReceiveFromAddressPolicy("test", false, 2, 4, -1, false, 
includes, excludes, properties2);
-      doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true, 
includes, excludes, null);
-      doTestEncodeReceiveFromAddressPolicy("test", false, 2, 4, -1, false, 
null, excludes, properties2);
-      doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true, 
includes, null, null);
-      doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true, 
Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
+      doTestEncodeReceiveFromAddressPolicy("test", false, 0, 1, 2, true, 
false, includes, excludes, properties1);
+      doTestEncodeReceiveFromAddressPolicy("test", true, 1, 3, 2, false, true, 
includes, excludes, null);
+      doTestEncodeReceiveFromAddressPolicy("test", false, 2, 4, -1, false, 
false, includes, excludes, properties2);
+      doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true, 
true, includes, excludes, null);
+      doTestEncodeReceiveFromAddressPolicy("test", false, 2, 4, -1, false, 
true, null, excludes, properties2);
+      doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true, 
false, includes, null, null);
+      doTestEncodeReceiveFromAddressPolicy("test", true, 7, -1, 255, true, 
true, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
    }
 
    @SuppressWarnings("unchecked")
@@ -240,12 +241,13 @@ public class AMQPFederationPolicySupportTest {
                                                      long 
autoDeleteMessageCount,
                                                      int maxHops,
                                                      boolean 
enableDivertBindings,
+                                                     boolean 
allowWildcardGroupings,
                                                      Collection<String> 
includes,
                                                      Collection<String> 
excludes,
                                                      Map<String, Object> 
policyProperties) {
       final FederationReceiveFromAddressPolicy policy = new 
FederationReceiveFromAddressPolicy(
-         name, autoDelete, autoDeleteDelay, autoDeleteMessageCount, maxHops,
-         enableDivertBindings, includes, excludes, policyProperties, null, 
DEFAULT_WILDCARD_CONFIGURATION);
+         name, autoDelete, autoDeleteDelay, autoDeleteMessageCount, maxHops, 
enableDivertBindings,
+         allowWildcardGroupings, includes, excludes, policyProperties, null, 
DEFAULT_WILDCARD_CONFIGURATION);
 
       final AMQPMessage message = 
AMQPFederationPolicySupport.encodeAddressPolicyControlMessage(policy);
 
@@ -263,6 +265,7 @@ public class AMQPFederationPolicySupportTest {
       assertEquals(autoDeleteMessageCount, 
policyMap.get(ADDRESS_AUTO_DELETE_MSG_COUNT));
       assertEquals(maxHops, policyMap.get(ADDRESS_MAX_HOPS));
       assertEquals(enableDivertBindings, 
policyMap.get(ADDRESS_ENABLE_DIVERT_BINDINGS));
+      assertEquals(allowWildcardGroupings, 
policyMap.get(ADDRESS_ALLOW_WILDCARD_GROUPINGS));
 
       if (includes == null || includes.isEmpty()) {
          assertFalse(policyMap.containsKey(ADDRESS_INCLUDES));
@@ -420,11 +423,11 @@ public class AMQPFederationPolicySupportTest {
       properties.put("amqpCredits", "10");
       properties.put("amqpLowCredits", "3");
 
-      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, includes, excludes, null);
-      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, includes, excludes, properties);
-      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, null, excludes, null);
-      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, includes, null, properties);
-      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
+      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, false, includes, excludes, null);
+      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, true, includes, excludes, properties);
+      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, true, null, excludes, null);
+      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, false, includes, null, properties);
+      doTestDecodeReceiveFromAddressPolicy("address", "test", false, 0, 1, 2, 
true, true, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyMap());
    }
 
    private void doTestDecodeReceiveFromAddressPolicy(String address, String 
name,
@@ -433,6 +436,7 @@ public class AMQPFederationPolicySupportTest {
                                                      long 
autoDeleteMessageCount,
                                                      int maxHops,
                                                      boolean 
enableDivertBindings,
+                                                     boolean 
allowWildcardGroupings,
                                                      Collection<String> 
includes,
                                                      Collection<String> 
excludes,
                                                      Map<String, String> 
policyProperties) throws ActiveMQException {
@@ -453,6 +457,7 @@ public class AMQPFederationPolicySupportTest {
       policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, autoDeleteMessageCount);
       policyMap.put(ADDRESS_MAX_HOPS, maxHops);
       policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, enableDivertBindings);
+      policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, allowWildcardGroupings);
 
       if (includes != null && !includes.isEmpty()) {
          policyMap.put(ADDRESS_INCLUDES, new ArrayList<>(includes));
@@ -469,8 +474,8 @@ public class AMQPFederationPolicySupportTest {
       final FederationReceiveFromAddressPolicy policy =
          
AMQPFederationPolicySupport.decodeReceiveFromAddressPolicy(amqpMessage, 
DEFAULT_WILDCARD_CONFIGURATION);
 
-      checkPolicyMatchesExpectations(policy, name, autoDelete, 
autoDeleteDelay, autoDeleteMessageCount,
-                                     maxHops, enableDivertBindings, includes, 
excludes, policyProperties);
+      checkPolicyMatchesExpectations(policy, name, autoDelete, 
autoDeleteDelay, autoDeleteMessageCount, maxHops,
+                                     enableDivertBindings, 
allowWildcardGroupings, includes, excludes, policyProperties);
    }
 
    @Test
@@ -572,12 +577,12 @@ public class AMQPFederationPolicySupportTest {
       properties.put("amqpCredits", "10");
       properties.put("amqpLowCredits", "3");
 
-      doTestCreateAddressPolicyFromConfigurationElement("test", false, 0, 1, 
2, true, includes, excludes, null);
-      doTestCreateAddressPolicyFromConfigurationElement("test", true, 1, 2, 3, 
true, includes, excludes, properties);
-      doTestCreateAddressPolicyFromConfigurationElement("test", false, 10, 9, 
8, false, null, excludes, properties);
-      doTestCreateAddressPolicyFromConfigurationElement("test", true, 1, 1, 1, 
false, includes, null, null);
-      doTestCreateAddressPolicyFromConfigurationElement("test", false, 7, 1, 
1, true, null, null, properties);
-      doTestCreateAddressPolicyFromConfigurationElement("test", false, 7, 1, 
1, true, Collections.emptySet(), Collections.emptySet(), 
Collections.emptyMap());
+      doTestCreateAddressPolicyFromConfigurationElement("test", false, 0, 1, 
2, true, false, includes, excludes, null);
+      doTestCreateAddressPolicyFromConfigurationElement("test", true, 1, 2, 3, 
true, true, includes, excludes, properties);
+      doTestCreateAddressPolicyFromConfigurationElement("test", false, 10, 9, 
8, false, true, null, excludes, properties);
+      doTestCreateAddressPolicyFromConfigurationElement("test", true, 1, 1, 1, 
false, false, includes, null, null);
+      doTestCreateAddressPolicyFromConfigurationElement("test", false, 7, 1, 
1, true, true, null, null, properties);
+      doTestCreateAddressPolicyFromConfigurationElement("test", false, 7, 1, 
1, true, false, Collections.emptySet(), Collections.emptySet(), 
Collections.emptyMap());
    }
 
    private void doTestCreateAddressPolicyFromConfigurationElement(String name,
@@ -586,6 +591,7 @@ public class AMQPFederationPolicySupportTest {
                                                                   long 
autoDeleteMessageCount,
                                                                   int maxHops,
                                                                   boolean 
enableDivertBindings,
+                                                                  boolean 
allowWildcardGroupings,
                                                                   
Collection<String> includes,
                                                                   
Collection<String> excludes,
                                                                   Map<String, 
Object> policyProperties) throws ActiveMQException {
@@ -598,6 +604,7 @@ public class AMQPFederationPolicySupportTest {
       element.setAutoDeleteMessageCount(autoDeleteMessageCount);
       element.setMaxHops(maxHops);
       element.setEnableDivertBindings(enableDivertBindings);
+      element.setAllowWildcardGroupings(allowWildcardGroupings);
       element.setProperties(policyProperties);
 
       if (includes != null) {
@@ -611,12 +618,13 @@ public class AMQPFederationPolicySupportTest {
       final FederationReceiveFromAddressPolicy policy = 
AMQPFederationPolicySupport.create(element, DEFAULT_WILDCARD_CONFIGURATION);
 
       checkPolicyMatchesExpectations(policy, name, autoDelete, 
autoDeleteDelay, autoDeleteMessageCount, maxHops,
-                                     enableDivertBindings, includes, excludes, 
policyProperties);
+                                     enableDivertBindings, 
allowWildcardGroupings, includes, excludes, policyProperties);
    }
 
    private void 
checkPolicyMatchesExpectations(FederationReceiveFromAddressPolicy policy,
                                                String name, boolean 
autoDelete, long autoDeleteDelay,
-                                               long autoDeleteMessageCount, 
int maxHops, boolean enableDivertBindings,
+                                               long autoDeleteMessageCount, 
int maxHops,
+                                               boolean enableDivertBindings, 
boolean enableWildcardSubscriptions,
                                                Collection<?> includes, 
Collection<?> excludes,
                                                Map<String, ?> 
policyProperties) {
       assertEquals(name, policy.getPolicyName());
@@ -625,6 +633,7 @@ public class AMQPFederationPolicySupportTest {
       assertEquals(autoDeleteMessageCount, policy.getAutoDeleteMessageCount());
       assertEquals(maxHops, policy.getMaxHops());
       assertEquals(enableDivertBindings, policy.isEnableDivertBindings());
+      assertEquals(enableWildcardSubscriptions, 
policy.isAllowWildcardGroupings());
 
       if (includes == null || includes.isEmpty()) {
          assertTrue(policy.getIncludes().isEmpty());
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElement.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElement.java
index 6ee4351fb7..6f3ef9b64c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElement.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElement.java
@@ -39,6 +39,7 @@ public final class AMQPFederationAddressPolicyElement 
implements Serializable {
    private Long autoDeleteMessageCount;
    private int maxHops;
    private Boolean enableDivertBindings;
+   private Boolean allowWildcardGroupings;
    private TransformerConfiguration transformerConfig;
 
    public String getName() {
@@ -173,6 +174,15 @@ public final class AMQPFederationAddressPolicyElement 
implements Serializable {
       return transformerConfig;
    }
 
+   public Boolean isAllowWildcardGroupings() {
+      return allowWildcardGroupings;
+   }
+
+   public AMQPFederationAddressPolicyElement setAllowWildcardGroupings(Boolean 
allowWildcardGroupings) {
+      this.allowWildcardGroupings = allowWildcardGroupings;
+      return this;
+   }
+
    @Override
    public boolean equals(Object obj) {
       if (this == obj) {
@@ -190,14 +200,15 @@ public final class AMQPFederationAddressPolicyElement 
implements Serializable {
              Objects.equals(autoDeleteDelay, other.autoDeleteDelay) &&
              Objects.equals(autoDeleteMessageCount, 
other.autoDeleteMessageCount) &&
              Objects.equals(enableDivertBindings, other.enableDivertBindings) 
&&
+             Objects.equals(allowWildcardGroupings, 
other.allowWildcardGroupings) &&
              Objects.equals(transformerConfig, other.transformerConfig) &&
              maxHops == other.maxHops;
    }
 
    @Override
    public int hashCode() {
-      return Objects.hash(name, includes, excludes, properties, autoDelete, 
autoDeleteDelay,
-                          autoDeleteMessageCount, maxHops, 
enableDivertBindings, transformerConfig);
+      return Objects.hash(name, includes, excludes, properties, autoDelete, 
autoDeleteDelay, autoDeleteMessageCount,
+                          maxHops, enableDivertBindings, 
allowWildcardGroupings, transformerConfig);
    }
 
    // We are required to implement a named match type so that we can perform 
this configuration
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 7927aa6b73..580355d4ac 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2349,6 +2349,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
             
config.setAutoDeleteMessageCount(MINUS_ONE_OR_GE_ZERO.validate("auto-delete-message-count",
 Long.parseLong(item.getNodeValue())).longValue());
          } else if (item.getNodeName().equals("enable-divert-bindings")) {
             
config.setEnableDivertBindings(Boolean.parseBoolean(item.getNodeValue()));
+         } else if (item.getNodeName().equals("allow-wildcard-groupings")) {
+            
config.setAllowWildcardGroupings(Boolean.parseBoolean(item.getNodeValue()));
          }
       }
 
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 63c6cf5321..2af3e1b144 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2462,6 +2462,7 @@
       <xsd:attribute name="max-hops" type="xsd:int" use="optional" />
       <xsd:attribute name="name" type="xsd:ID" use="required" />
       <xsd:attribute name="enable-divert-bindings" type="xsd:boolean" 
use="optional" />
+      <xsd:attribute name="allow-wildcard-groupings" type="xsd:boolean" 
use="optional" />
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElementTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElementTest.java
index f8fab0db19..28457ee8e3 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElementTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederationAddressPolicyElementTest.java
@@ -120,5 +120,13 @@ public class AMQPFederationAddressPolicyElementTest {
       config2.setMaxHops(10);
       assertEquals(config1, config2);
       assertEquals(config1.hashCode(), config2.hashCode());
+
+      // Allow receivers to nest under remote wildcard addresses
+      config1.setAllowWildcardGroupings(true);
+      assertNotEquals(config1, config2);
+      assertNotEquals(config1.hashCode(), config2.hashCode());
+      config2.setAllowWildcardGroupings(true);
+      assertEquals(config1, config2);
+      assertEquals(config1.hashCode(), config2.hashCode());
    }
 }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 8b2bf1f4ad..fd4f1fc701 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -469,6 +469,7 @@ public class ConfigurationImplTest extends 
AbstractConfigurationTestBase {
       insertionOrderedProperties.put("AMQPConnections.target.federations.abc." 
+ policyType + ".policy2.includes.m4.addressMatch", "y");
       insertionOrderedProperties.put("AMQPConnections.target.federations.abc." 
+ policyType + ".policy2.excludes.m5.addressMatch", "z");
       insertionOrderedProperties.put("AMQPConnections.target.federations.abc." 
+ policyType + ".policy2.enableDivertBindings", "true");
+      insertionOrderedProperties.put("AMQPConnections.target.federations.abc." 
+ policyType + ".policy2.allowWildcardGroupings", "true");
       insertionOrderedProperties.put("AMQPConnections.target.federations.abc." 
+ policyType + ".policy2.properties.a", "b");
 
       configuration.parsePrefixedProperties(insertionOrderedProperties, null);
@@ -517,6 +518,7 @@ public class ConfigurationImplTest extends 
AbstractConfigurationTestBase {
       assertEquals(42L, 
addressPolicy1.getAutoDeleteMessageCount().longValue());
       assertEquals(10000L, addressPolicy1.getAutoDeleteDelay().longValue());
       assertNull(addressPolicy1.isEnableDivertBindings());
+      assertNull(addressPolicy1.isAllowWildcardGroupings());
       assertTrue(addressPolicy1.getProperties().isEmpty());
 
       addressPolicy1.getIncludes().forEach(match -> {
@@ -536,6 +538,7 @@ public class ConfigurationImplTest extends 
AbstractConfigurationTestBase {
       assertNull(addressPolicy2.getAutoDeleteMessageCount());
       assertNull(addressPolicy2.getAutoDeleteDelay());
       assertTrue(addressPolicy2.isEnableDivertBindings());
+      assertTrue(addressPolicy2.isAllowWildcardGroupings());
       assertFalse(addressPolicy2.getProperties().isEmpty());
       assertEquals("b", addressPolicy2.getProperties().get("a"));
 
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 88a7619c8f..ba33b5a739 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -501,7 +501,7 @@
                   <property key="amqpCredits" value="2"/>
                   <property key="amqpLowCredits" value="1"/>
                </remote-queue-policy>
-               <local-address-policy name="lap1" auto-delete="false" 
auto-delete-delay="1" auto-delete-message-count="12" max-hops="2" 
enable-divert-bindings="true">
+               <local-address-policy name="lap1" auto-delete="false" 
auto-delete-delay="1" auto-delete-message-count="12" max-hops="2" 
enable-divert-bindings="true" allow-wildcard-groupings="true">
                   <include address-match="orders" />
                   <exclude address-match="all.#" />
                   <transformer-class-name>class-name</transformer-class-name>
@@ -510,7 +510,7 @@
                   <include address-match="address" queue-match="theQueue" />
                   
<transformer-class-name>class-another</transformer-class-name>
                </local-queue-policy>
-               <remote-address-policy name="rap1" auto-delete="true" 
auto-delete-delay="2" auto-delete-message-count="42" max-hops="1" 
enable-divert-bindings="false">
+               <remote-address-policy name="rap1" auto-delete="true" 
auto-delete-delay="2" auto-delete-message-count="42" max-hops="1" 
enable-divert-bindings="false" allow-wildcard-groupings="false">
                   <include address-match="support" />
                   <property key="amqpCredits" value="2"/>
                   <property key="amqpLowCredits" value="1"/>
diff --git a/docs/user-manual/amqp-federation-configuration-glossary.adoc 
b/docs/user-manual/amqp-federation-configuration-glossary.adoc
index 3a12309981..9f5be0b09c 100644
--- a/docs/user-manual/amqp-federation-configuration-glossary.adoc
+++ b/docs/user-manual/amqp-federation-configuration-glossary.adoc
@@ -92,6 +92,7 @@ In this case the policy is sent across the connection to the 
remote broker and f
                              auto-delete-delay="30000"
                              auto-delete-message-count="0"
                              max-hops="1"
+                             allow-wildcard-groupings="false"
                              enable-divert-bindings="true">
          <include address-match="local-address.#" />
          <exclude address-match="local-address.excluded" />
@@ -127,6 +128,11 @@ enable-divert-bindings::
 Setting to `true` enables divert bindings to be listened-to for demand.
 If a divert binding with an address matches the included addresses for the 
address policy, any queue bindings that match the forwarding address of the 
divert creates demand.
 The default value is `false`.
+allow-wildcard-groupings::
+Setting to 'true' enables the address federation policy matches to define if a 
remote wildcard address will be used to host the address receiver bindings for 
the matching addresses if the match value is itself a wildcard.
+This can be useful if federating large numbers of addresses that have low 
traffic as it reduces the number of addresses that need to exist on the remote 
federation target at the expense of higher CPU load on the target in order to 
route messages to the appropriate wildcard bindings for the matched addresses.
+The default value is `false` and the federation receiver address bindings will 
be attached to the matching addresses on the remote.
+When enabled the remote broker must have wildcard routing enabled for this 
feature to work correctly.
 
 The following set of elements can be configured within the address policy to 
control the behavior of the deployed policy.
 For the policy to actually match any addresses on the target broker an include 
must be provided at a minimum.
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index 71b8d48117..1cd0c1fefd 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
@@ -106,6 +107,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
@@ -2035,7 +2037,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, 1, true,
+                                                true, 30_000L, 1000L, 1, true, 
false,
                                                 includes, excludes, null, null,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -2081,7 +2083,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, 1, true,
+                                                true, 30_000L, 1000L, 1, true, 
false,
                                                 includes, excludes, null, 
transformerConfiguration,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -2119,7 +2121,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, 1, true,
+                                                true, 30_000L, 1000L, 1, true, 
false,
                                                 includes, null, properties, 
null,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -2189,7 +2191,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, 1, true,
+                                                true, 30_000L, 1000L, 1, true, 
false,
                                                 includes, null, properties, 
null,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -2261,7 +2263,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, 1, true,
+                                                true, 30_000L, 1000L, 1, true, 
false,
                                                 includes, null, properties, 
null,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -2335,7 +2337,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, 1, true,
+                                                true, 30_000L, 1000L, 1, true, 
false,
                                                 includes, null, properties, 
null,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -2414,7 +2416,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, 1, true,
+                                                true, 30_000L, 1000L, 1, true, 
false,
                                                 includes, null, properties, 
transformerConfiguration,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -3531,6 +3533,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
       policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
       policyMap.put(ADDRESS_MAX_HOPS, 5);
       policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+      policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
       policyMap.put(ADDRESS_INCLUDES, includes);
 
       final EncodedAmqpValueMatcher bodyMatcher = new 
EncodedAmqpValueMatcher(policyMap);
@@ -3617,6 +3620,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
       policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
       policyMap.put(ADDRESS_MAX_HOPS, 5);
       policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+      policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
       policyMap.put(ADDRESS_INCLUDES, includes);
 
       final EncodedAmqpValueMatcher bodyMatcher = new 
EncodedAmqpValueMatcher(policyMap);
@@ -6705,7 +6709,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, -1, true,
+                                                true, 30_000L, 1000L, -1, 
true, false,
                                                 includes, null, properties, 
null,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -7063,7 +7067,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, 1, false,
+                                                true, 30_000L, 1000L, 1, 
false, false,
                                                 includes, null, properties, 
null,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -7169,7 +7173,7 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
       final FederationReceiveFromAddressPolicy policy =
          new FederationReceiveFromAddressPolicy("test-address-policy",
-                                                true, 30_000L, 1000L, 1, true,
+                                                true, 30_000L, 1000L, 1, true, 
false,
                                                 includes, null, properties, 
null,
                                                 
DEFAULT_WILDCARD_CONFIGURATION);
 
@@ -7546,6 +7550,493 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void testWildcardAddressUedForFederationSubscriptionWhenEnabled() 
throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            .respond()
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+         peer.expectAttach().ofReceiver()
+                            .withSource().withDynamic(true)
+                            .and()
+                            
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+                            .respondInKind()
+                            .withTarget().withAddress("test-dynamic-events");
+         peer.expectFlow().withLinkCredit(10);
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final String wildcardAddress = getTestName() + ".#";
+         final String addressA = getTestName() + ".A";
+         final String addressB = getTestName() + ".B";
+
+         final String expectedJMSFilterA = "AMQAddress='" + addressA + "'";
+         final String expectedJMSFilterB = "AMQAddress='" + addressB + "'";
+         final AtomicReference<Attach> capturedAttachA = new 
AtomicReference<>();
+         final AtomicReference<Attach> capturedAttachB = new 
AtomicReference<>();
+
+         final Symbol jmsSelectorKey = Symbol.valueOf("jms-selector");
+         final 
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong 
jmsSelectorCode =
+            
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L);
+
+         final AMQPFederationAddressPolicyElement receiveFromAddress = new 
AMQPFederationAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.addToIncludes(wildcardAddress);
+         receiveFromAddress.setAutoDelete(true);
+         receiveFromAddress.setAutoDeleteDelay(10_000L);
+         receiveFromAddress.setAutoDeleteMessageCount(-1L);
+         receiveFromAddress.setAllowWildcardGroupings(true);
+
+         final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addLocalAddressPolicy(receiveFromAddress);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         // A first binding on the wildcard address what collects only 
messages for address A
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withCapture(attach -> capturedAttachA.set(attach))
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString(addressA),
+                                            containsString("address-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            
.withSource().withAddress(startsWith(wildcardAddress)).also()
+                            .withTarget().withAddress(addressA).also()
+                            .respond()
+                            
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+         peer.expectFlow().withLinkCredit(1000);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumerA = 
session.createConsumer(session.createTopic(addressA));
+
+            connection.start();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            // A second binding on the wildcard address what collects only 
messages for address B
+            peer.expectAttach().ofReceiver()
+                               
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+                               .withCapture(attach -> 
capturedAttachB.set(attach))
+                               .withName(allOf(containsString(getTestName()),
+                                               containsString(addressB),
+                                               
containsString("address-receiver"),
+                                               
containsString(server.getNodeID().toString())))
+                               
.withSource().withAddress(startsWith(wildcardAddress)).also()
+                               .withTarget().withAddress(addressB).also()
+                               .respond()
+                               
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+            peer.expectFlow().withLinkCredit(1000);
+
+            final MessageConsumer consumerB = 
session.createConsumer(session.createTopic(addressB));
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            final Map<Symbol, Object> filtersMapA = 
capturedAttachA.get().getSource().getFilter();
+
+            assertNotNull(filtersMapA);
+            assertTrue(filtersMapA.containsKey(jmsSelectorKey));
+            final DescribedType jmsSelectorEntryA = (DescribedType) 
filtersMapA.get(jmsSelectorKey);
+            assertNotNull(jmsSelectorEntryA);
+            assertEquals(jmsSelectorEntryA.getDescriptor(), jmsSelectorCode);
+            assertEquals(jmsSelectorEntryA.getDescribed().toString(), 
expectedJMSFilterA);
+
+            final Map<Symbol, Object> filtersMapB = 
capturedAttachB.get().getSource().getFilter();
+
+            assertNotNull(filtersMapB);
+            assertTrue(filtersMapB.containsKey(jmsSelectorKey));
+            final DescribedType jmsSelectorEntryB = (DescribedType) 
filtersMapB.get(jmsSelectorKey);
+            assertNotNull(jmsSelectorEntryB);
+            assertEquals(jmsSelectorEntryB.getDescriptor(), jmsSelectorCode);
+            assertEquals(jmsSelectorEntryB.getDescribed().toString(), 
expectedJMSFilterB);
+
+            // Route some messages to check they arrive on the local consumers
+            peer.expectDisposition().withSettled(true).withState().accepted();
+            peer.expectDisposition().withSettled(true).withState().accepted();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(0)
+                                 .withBody().withValue("Address A").also()
+                                 .now();
+            peer.remoteTransfer().withHandle(3)
+                                 .withDeliveryId(1)
+                                 .withBody().withValue("Address B").also()
+                                 .later(10);
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            final TextMessage messageA = (TextMessage) 
consumerA.receiveNoWait();
+            final TextMessage messageB = (TextMessage) 
consumerB.receiveNoWait();
+
+            assertNotNull(messageA);
+            assertNotNull(messageB);
+
+            assertEquals("Address A", messageA.getText());
+            assertEquals("Address B", messageB.getText());
+
+            assertNull(consumerA.receiveNoWait());
+            assertNull(consumerB.receiveNoWait());
+
+            peer.close();
+         }
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testFallbackToLegacySubscriptionIfNoFQQNSupportAndWildcardSubscriptionsGroupsEnabled()
 throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            .respond() // Respond with no V2 property
+                            
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+         peer.expectAttach().ofReceiver()
+                            .withSource().withDynamic(true)
+                            .and()
+                            
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+                            .respondInKind()
+                            .withTarget().withAddress("test-dynamic-events");
+         peer.expectFlow().withLinkCredit(10);
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final String wildcardAddress = getTestName() + ".#";
+         final String addressA = getTestName() + ".A";
+         final String addressB = getTestName() + ".B";
+
+         final AMQPFederationAddressPolicyElement receiveFromAddress = new 
AMQPFederationAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.addToIncludes(wildcardAddress);
+         receiveFromAddress.setAutoDelete(true);
+         receiveFromAddress.setAutoDeleteDelay(10_000L);
+         receiveFromAddress.setAutoDeleteMessageCount(-1L);
+         receiveFromAddress.setAllowWildcardGroupings(true);
+
+         final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addLocalAddressPolicy(receiveFromAddress);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         // Binding on the matching address since no FQQN support
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString(addressA),
+                                            containsString("address-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            .withSource().withAddress(addressA).also()
+                            .withTarget().withAddress(addressA).also()
+                            .respond()
+                            
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+         peer.expectFlow().withLinkCredit(1000);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumerA = 
session.createConsumer(session.createTopic(addressA));
+
+            connection.start();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            // Binding on the matching address since no FQQN support
+            peer.expectAttach().ofReceiver()
+                               
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+                               .withName(allOf(containsString(getTestName()),
+                                               containsString(addressB),
+                                               
containsString("address-receiver"),
+                                               
containsString(server.getNodeID().toString())))
+                               .withSource().withAddress(addressB).also()
+                               .withTarget().withAddress(addressB).also()
+                               .respond()
+                               
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+            peer.expectFlow().withLinkCredit(1000);
+
+            final MessageConsumer consumerB = 
session.createConsumer(session.createTopic(addressB));
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            // Route some messages to check they arrive on the local consumers
+            peer.expectDisposition().withSettled(true).withState().accepted();
+            peer.expectDisposition().withSettled(true).withState().accepted();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(0)
+                                 .withBody().withValue("Address A").also()
+                                 .now();
+            peer.remoteTransfer().withHandle(3)
+                                 .withDeliveryId(1)
+                                 .withBody().withValue("Address B").also()
+                                 .later(10);
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            final TextMessage messageA = (TextMessage) 
consumerA.receiveNoWait();
+            final TextMessage messageB = (TextMessage) 
consumerB.receiveNoWait();
+
+            assertNotNull(messageA);
+            assertNotNull(messageB);
+
+            assertEquals("Address A", messageA.getText());
+            assertEquals("Address B", messageB.getText());
+
+            assertNull(consumerA.receiveNoWait());
+            assertNull(consumerB.receiveNoWait());
+
+            peer.close();
+         }
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testFederationTreatsNonWildcardMatchAsNormalWhenWildcardSubscriptionsSetToAllowed()
 throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            .respond() // Need V2 for wildcard subscription to 
even occur
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+         peer.expectAttach().ofReceiver()
+                            .withSource().withDynamic(true)
+                            .and()
+                            
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+                            .respondInKind()
+                            .withTarget().withAddress("test-dynamic-events");
+         peer.expectFlow().withLinkCredit(10);
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPFederationAddressPolicyElement receiveFromAddress = new 
AMQPFederationAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.addToIncludes(getTestName());
+         receiveFromAddress.addToIncludes("unmatched.a.#");
+         receiveFromAddress.addToIncludes("unmatched.*.b");
+         receiveFromAddress.setAllowWildcardGroupings(true); // Allowed but 
match will not be on any of the wildcards
+
+         final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addLocalAddressPolicy(receiveFromAddress);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+         server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("address-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            .withSource().withAddress(startsWith(getTestName() 
+ "::")).also() // FQQN Address exact match
+                            .withTarget().withAddress(getTestName()).also()
+                            .respond()
+                            
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+         peer.expectFlow().withLinkCredit(1000);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            session.createConsumer(session.createTopic(getTestName()));
+
+            connection.start();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.close();
+         }
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testFederationAttachesToRemoteBrokerWithWildcardRoutingDisabled() throws 
Exception {
+      // Disable wildcard routing to check / demonstrate that mis-configured 
federation topologies
+      // can fail to federate if wild card subscription grouping is enabled 
but the brokers can't do
+      // wildcard routing for those subscriptions.
+      server.getConfiguration().setWildCardConfiguration(new 
WildcardConfiguration().setRoutingEnabled(false));
+      server.start();
+
+      final String wildcardAddress = getTestName() + ".#";
+      final String actualAddress = getTestName() + ".A";
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 1L);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test", true);
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       
.withTarget().withAddress(actualAddress).also()
+                                       
.withSource().withAddress(wildcardAddress + "::federation-unique-queue-name");
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withTarget().withAddress(actualAddress).also()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(wildcardAddress + 
"::federation-unique-queue-name")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         Wait.assertTrue(() -> 
server.addressQuery(SimpleString.of(wildcardAddress)).isExists());
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic(actualAddress));
+
+            connection.start();
+
+            // We adon't expect the federation to get this as the remote 
broker has disabled wildcard
+            // routing but does not reject subscriptions on wildcard addresses 
in this configuration.
+            // This is a case where user has not properly configured their 
federation servers.
+            producer.send(session.createTextMessage("test-message"));
+         }
+
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         server.stop();
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testFederationReceiverAttachedToWildcardAddressWhenAutoCreateAddressIsDisabledAndAddressesStaticallyDefined()
 throws Exception {
+      final AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setAutoCreateQueues(true);
+      addressSettings.setAutoCreateAddresses(false);
+
+      final String wildcardAddress = getTestName() + ".#";
+      final String actualAddress = getTestName() + ".A";
+
+      server.getConfiguration().getAddressSettings().put("#", addressSettings);
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of(wildcardAddress), 
RoutingType.MULTICAST));
+      server.addAddressInfo(new AddressInfo(SimpleString.of(actualAddress), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 1L);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test", true);
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       
.withTarget().withAddress(actualAddress).also()
+                                       
.withSource().withAddress(wildcardAddress + "::federation-unique-queue-name");
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withTarget().withAddress(actualAddress).also()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(wildcardAddress + 
"::federation-unique-queue-name")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().withMessage()
+                              .withHeader().also()
+                              .withMessageAnnotations().also()
+                              .withProperties().also()
+                              .withValue("test-message").and()
+                              .accept();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         Wait.assertTrue(() -> 
server.addressQuery(SimpleString.of(wildcardAddress)).isExists());
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic(actualAddress));
+
+            connection.start();
+
+            producer.send(session.createTextMessage("test-message"));
+         }
+
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         server.stop();
+      }
+   }
+
    protected void configureSecurity(ActiveMQServer server, String allowed, 
String restricted, String... userAllowedOnly) {
       ActiveMQJAASSecurityManager securityManager = 
(ActiveMQJAASSecurityManager) server.getSecurityManager();
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index e84355992e..ad5207323b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -67,6 +67,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_EXCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
@@ -617,6 +618,7 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
       policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 314L);
       policyMap.put(ADDRESS_MAX_HOPS, 5);
       policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+      policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
       policyMap.put(ADDRESS_INCLUDES, includes);
       policyMap.put(ADDRESS_EXCLUDES, excludes);
 
@@ -690,6 +692,7 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
       policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, 314L);
       policyMap.put(ADDRESS_MAX_HOPS, 5);
       policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+      policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
       policyMap.put(ADDRESS_INCLUDES, includes);
       policyMap.put(ADDRESS_EXCLUDES, excludes);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java
index 53f4837c65..582056ea82 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java
@@ -73,6 +73,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ENABLE_DIVERT_BINDINGS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_ALLOW_WILDCARD_GROUPINGS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_INCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_MAX_HOPS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
@@ -1080,6 +1081,7 @@ class AMQPFederationManagementTest extends 
AmqpClientTestSupport {
       policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
       policyMap.put(ADDRESS_MAX_HOPS, 5);
       policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+      policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
       policyMap.put(ADDRESS_INCLUDES, includes);
 
       final EncodedAmqpValueMatcher bodyMatcher = new 
EncodedAmqpValueMatcher(policyMap);
@@ -1282,6 +1284,7 @@ class AMQPFederationManagementTest extends 
AmqpClientTestSupport {
       policyMap.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
       policyMap.put(ADDRESS_MAX_HOPS, 5);
       policyMap.put(ADDRESS_ENABLE_DIVERT_BINDINGS, false);
+      policyMap.put(ADDRESS_ALLOW_WILDCARD_GROUPINGS, false);
       policyMap.put(ADDRESS_INCLUDES, includes);
 
       final EncodedAmqpValueMatcher bodyMatcher = new 
EncodedAmqpValueMatcher(policyMap);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index 7dd6ec9e47..e5c7a378c5 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -2160,4 +2161,293 @@ public class AMQPFederationServerToServerTest extends 
AmqpClientTestSupport {
          assertNotNull(consumer.receiveNoWait());
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederationOccursUsingWildcardSubscriptionsIfEnabledCore() throws 
Exception {
+      doTestAddressFederationOccursUsingWildcardSubscriptionsIfEnabled("CORE");
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederationOccursUsingWildcardSubscriptionsIfEnabledAMQP() throws 
Exception {
+      doTestAddressFederationOccursUsingWildcardSubscriptionsIfEnabled("AMQP");
+   }
+
+   private void 
doTestAddressFederationOccursUsingWildcardSubscriptionsIfEnabled(String 
protocol) throws Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final SimpleString wildcardAddress = SimpleString.of(getTestName() + 
".#");
+      final SimpleString fullAddressA = SimpleString.of(getTestName() + ".A");
+      final SimpleString fullAddressB = SimpleString.of(getTestName() + ".B");
+      final SimpleString unmatchedAddress = SimpleString.of(getTestName() + 
"NoMatch");
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy.setName("local-test-policy");
+      localAddressPolicy.addToIncludes(wildcardAddress.toString());
+      localAddressPolicy.setAutoDelete(false);
+      localAddressPolicy.setAutoDeleteDelay(-1L);
+      localAddressPolicy.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+      localAddressPolicy.setAllowWildcardGroupings(true);
+
+      final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+      element.setName(getTestName() + ":Wildcards");
+      element.addLocalAddressPolicy(localAddressPolicy);
+
+      final AMQPBrokerConnectConfiguration amqpConnection1 =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection1.addElement(element);
+
+      server.getConfiguration().addAMQPConnection(amqpConnection1);
+      remoteServer.start();
+      server.start();
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + SERVER_PORT);
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + 
SERVER_PORT_REMOTE);
+
+      try (Connection connectionL = factoryLocal.createConnection();
+           Connection connectionR = factoryRemote.createConnection()) {
+
+         final Session sessionL = 
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Session sessionR = 
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+         final Topic topicA = sessionL.createTopic(fullAddressA.toString());
+         final Topic topicB = sessionL.createTopic(fullAddressB.toString());
+         final Topic topicUnmatched = 
sessionL.createTopic(unmatchedAddress.toString());
+
+         final MessageConsumer consumerL = 
sessionL.createConsumer(topicUnmatched);
+         final MessageConsumer consumerL_A = sessionL.createConsumer(topicA);
+         final MessageConsumer consumerL_B = sessionL.createConsumer(topicB);
+         final MessageProducer producerR = sessionR.createProducer(null);
+
+         connectionL.start();
+         connectionR.start();
+
+         // Local broker should see the full address but the remote should 
only have a wildcard subscription
+         // The wildcard subscription should have two bindings since we have 
two matching consumers
+         Wait.assertTrue(() -> server.bindingQuery(fullAddressA, 
false).getQueueNames().size() == 1, 10_000, 50);
+         Wait.assertTrue(() -> server.bindingQuery(fullAddressB, 
false).getQueueNames().size() == 1, 10_000, 50);
+         Wait.assertTrue(() -> 
remoteServer.addressQuery(wildcardAddress).isExists());
+         Wait.assertTrue(() -> remoteServer.bindingQuery(wildcardAddress, 
false).getQueueNames().size() == 2, 10_000, 50);
+
+         assertFalse(remoteServer.addressQuery(fullAddressA).isExists());
+         assertFalse(remoteServer.addressQuery(fullAddressA).isExists());
+
+         final TextMessage remoteMessage = 
sessionR.createTextMessage(getTestName());
+
+         producerR.send(topicA, remoteMessage);
+
+         final TextMessage localMessageA = (TextMessage) 
consumerL_A.receive(1_000);
+
+         assertNotNull(localMessageA);
+         assertNull(consumerL_B.receiveNoWait());
+
+         producerR.send(topicB, remoteMessage);
+
+         final TextMessage localMessageB = (TextMessage) 
consumerL_B.receive(1_000);
+
+         assertNotNull(localMessageB);
+         assertNull(consumerL_A.receiveNoWait());
+
+         consumerL_A.close();
+         consumerL_B.close();
+
+         Wait.assertTrue(() -> remoteServer.bindingQuery(wildcardAddress, 
false).getQueueNames().size() == 0, 10_000, 50);
+
+         assertNull(consumerL.receiveNoWait());
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederationOccursUsingWildcardSubscriptionsIfEnabledUsingSingleWordMatcher()
 throws Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final SimpleString wildcardAddress = SimpleString.of(getTestName() + 
".*.region");
+      final SimpleString fullAddressA = SimpleString.of(getTestName() + 
".A.region");
+      final SimpleString fullAddressB = SimpleString.of(getTestName() + 
".B.region");
+      final SimpleString unmatchedAddress = SimpleString.of(getTestName() + 
".C.regions");
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy.setName("local-test-policy");
+      localAddressPolicy.addToIncludes(wildcardAddress.toString());
+      localAddressPolicy.setAutoDelete(false);
+      localAddressPolicy.setAutoDeleteDelay(-1L);
+      localAddressPolicy.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+      localAddressPolicy.setAllowWildcardGroupings(true);
+
+      final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+      element.setName(getTestName() + ":Wildcards");
+      element.addLocalAddressPolicy(localAddressPolicy);
+
+      final AMQPBrokerConnectConfiguration amqpConnection1 =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection1.addElement(element);
+
+      server.getConfiguration().addAMQPConnection(amqpConnection1);
+      remoteServer.start();
+      server.start();
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+      try (Connection connectionL = factoryLocal.createConnection();
+           Connection connectionR = factoryRemote.createConnection()) {
+
+         final Session sessionL = 
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Session sessionR = 
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+         final Topic topicA = sessionL.createTopic(fullAddressA.toString());
+         final Topic topicB = sessionL.createTopic(fullAddressB.toString());
+         final Topic topicUnmatched = 
sessionL.createTopic(unmatchedAddress.toString());
+
+         final MessageConsumer consumerL = 
sessionL.createConsumer(topicUnmatched);
+         final MessageConsumer consumerL_A = sessionL.createConsumer(topicA);
+         final MessageConsumer consumerL_B = sessionL.createConsumer(topicB);
+         final MessageProducer producerR = sessionR.createProducer(null);
+
+         connectionL.start();
+         connectionR.start();
+
+         // Local broker should see the full address but the remote should 
only have a wildcard subscription
+         // The wildcard subscription should have two bindings since we have 
two matching consumers
+         Wait.assertTrue(() -> server.bindingQuery(fullAddressA, 
false).getQueueNames().size() == 1, 10_000, 50);
+         Wait.assertTrue(() -> server.bindingQuery(fullAddressB, 
false).getQueueNames().size() == 1, 10_000, 50);
+         Wait.assertTrue(() -> 
remoteServer.addressQuery(wildcardAddress).isExists());
+         Wait.assertTrue(() -> remoteServer.bindingQuery(wildcardAddress, 
false).getQueueNames().size() == 2, 10_000, 50);
+
+         assertFalse(remoteServer.addressQuery(fullAddressA).isExists());
+         assertFalse(remoteServer.addressQuery(fullAddressA).isExists());
+
+         final TextMessage remoteMessage = 
sessionR.createTextMessage(getTestName());
+
+         producerR.send(topicA, remoteMessage);
+
+         final TextMessage localMessageA = (TextMessage) 
consumerL_A.receive(1_000);
+
+         assertNotNull(localMessageA);
+         assertNull(consumerL_B.receiveNoWait());
+
+         producerR.send(topicB, remoteMessage);
+
+         final TextMessage localMessageB = (TextMessage) 
consumerL_B.receive(1_000);
+
+         assertNotNull(localMessageB);
+         assertNull(consumerL_A.receiveNoWait());
+
+         consumerL_A.close();
+         consumerL_B.close();
+
+         Wait.assertTrue(() -> remoteServer.bindingQuery(wildcardAddress, 
false).getQueueNames().size() == 0, 10_000, 50);
+
+         assertNull(consumerL.receiveNoWait());
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testRemoteAddressFederationAppliesConsumerFiltersIfEnabledEvenWithWildcardSubscriptions()
 throws Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final SimpleString wildcardAddress = SimpleString.of(getTestName() + 
".#");
+      final SimpleString fullAddressA = SimpleString.of(getTestName() + ".A");
+
+      final AMQPFederationAddressPolicyElement remoteAddressPolicy = new 
AMQPFederationAddressPolicyElement();
+      remoteAddressPolicy.setName("test-policy");
+      remoteAddressPolicy.addToIncludes(wildcardAddress.toString());
+      remoteAddressPolicy.setAutoDelete(false);
+      remoteAddressPolicy.setAutoDeleteDelay(-1L);
+      remoteAddressPolicy.setAutoDeleteMessageCount(-1L);
+      remoteAddressPolicy.setAllowWildcardGroupings(true);
+      remoteAddressPolicy.addProperty(IGNORE_ADDRESS_BINDING_FILTERS, 
String.valueOf(false));
+
+      final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+      element.setName(getTestName());
+      element.addRemoteAddressPolicy(remoteAddressPolicy);
+
+      final AMQPBrokerConnectConfiguration amqpConnection =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection.addElement(element);
+
+      server.getConfiguration().addAMQPConnection(amqpConnection);
+      remoteServer.start();
+      server.start();
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory("AMQP", "failover:(amqp://localhost:" + 
SERVER_PORT_REMOTE + ")");
+
+      final Connection connectionL = factoryLocal.createConnection();
+      final Connection connectionR = factoryRemote.createConnection();
+
+      final Session sessionL = 
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+      final Session sessionR = 
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+      final Topic topic = sessionR.createTopic(fullAddressA.toString());
+
+      final MessageConsumer consumerR = sessionR.createConsumer(topic, 
"color='red'");
+
+      connectionL.start();
+      connectionR.start();
+
+      Wait.assertTrue(() -> server.addressQuery(wildcardAddress).isExists());
+      Wait.assertTrue(() -> server.bindingQuery(wildcardAddress, 
false).getQueueNames().size() == 1);
+
+      Wait.assertTrue(() -> 
remoteServer.addressQuery(fullAddressA).isExists());
+      Wait.assertTrue(() -> remoteServer.bindingQuery(fullAddressA, 
false).getQueueNames().size() == 1);
+
+      try {
+         final MessageProducer producerL = sessionL.createProducer(topic);
+         final TextMessage message = sessionL.createTextMessage();
+
+         message.setText("First Red Message");
+         message.setStringProperty("color", "red");
+         producerL.send(message);
+
+         // Message that matched consumer filter should federate
+         final Message received1 = consumerR.receive(5_000);
+
+         assertNotNull(received1);
+         assertInstanceOf(TextMessage.class, received1);
+         assertEquals("First Red Message", ((TextMessage) 
received1).getText());
+         assertTrue(received1.propertyExists("color"));
+         assertEquals("red", received1.getStringProperty("color"));
+
+         // should be filtered and not sent over the federation link from the 
remote server.
+         message.setText("Hello World Blue");
+         message.setStringProperty("color", "blue");
+         producerL.send(message);
+
+         message.setText("Second Red Message");
+         message.setStringProperty("color", "red");
+         producerL.send(message);
+
+         final Message received2 = consumerR.receive(5_000);
+
+         assertNotNull(received2);
+         assertInstanceOf(TextMessage.class, received2);
+         assertEquals("Second Red Message", ((TextMessage) 
received2).getText());
+         assertTrue(received2.propertyExists("color"));
+         assertEquals("red", received2.getStringProperty("color"));
+
+         // Should be no more messages
+         assertNull(consumerR.receiveNoWait());
+
+         final org.apache.activemq.artemis.core.server.Queue localQueue =
+            server.locateQueue(server.bindingQuery(wildcardAddress, 
false).getQueueNames().get(0));
+
+         // Filtered message should not be federated so not added on the local 
server widlcard subscription binding.
+         assertEquals(2, localQueue.getMessagesAdded());
+      } finally {
+         connectionL.close();
+         connectionR.close();
+
+         server.stop();
+         remoteServer.stop();
+      }
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to