This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new d21c5c50ac ARTEMIS-6011 Wildcard routing / Address full or dropped is 
not properly propagated
d21c5c50ac is described below

commit d21c5c50ac3ceda8cc7d36c72b045eee9f2e7011
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Apr 17 10:28:05 2026 -0400

    ARTEMIS-6011 Wildcard routing / Address full or dropped is not properly 
propagated
    
    Co-authored with Justin Bertram
---
 .../connect/mirror/AMQPMirrorControllerSource.java |   8 +
 .../activemq/artemis/core/paging/PagingStore.java  |   2 +
 .../artemis/core/paging/impl/PagingStoreImpl.java  | 100 +++----
 .../core/postoffice/impl/PostOfficeImpl.java       |  19 ++
 .../artemis/core/server/RouteContextList.java      |   4 +
 .../artemis/core/server/RoutingContext.java        |   7 +-
 .../cluster/impl/RemoteQueueBindingImpl.java       |   4 +-
 .../core/server/impl/RoutingContextImpl.java       |  28 +-
 .../artemis/tests/integration/mqtt5/MQTT5Test.java |  50 ++++
 .../paging/WildcardAddressFullTest.java            | 301 +++++++++++++++++++++
 .../storage/PersistMultiThreadTest.java            |   5 +
 .../tests/stress/paging/PageCursorStressTest.java  |  12 +-
 .../core/paging/impl/PageTimedWriterUnitTest.java  |   2 +-
 .../core/paging/impl/PagingManagerImplTest.java    |   6 +-
 .../unit/core/paging/impl/PagingStoreImplTest.java |  22 +-
 15 files changed, 486 insertions(+), 84 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index b9e018c6e4..ea1ff1cc86 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -27,6 +27,7 @@ import 
org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@@ -787,12 +788,19 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
    static class PagedRouteContext implements RouteContextList {
 
+      private final PagingStore addressStore;
       private final List<Queue> durableQueues;
       private final List<Queue> nonDurableQueues;
 
+      @Override
+      public PagingStore getAddressStore() {
+         return addressStore;
+      }
+
       PagedRouteContext(Queue snfQueue) {
          List<Queue> queues = new ArrayList<>(1);
          queues.add(snfQueue);
+         this.addressStore = snfQueue.getPagingStore();
 
          if (snfQueue.isDurable()) {
             durableQueues = queues;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 55775c5b53..898034f7a8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -139,6 +139,8 @@ public interface PagingStore extends ActiveMQComponent, 
RefCountMessageListener
 
    Page usePage(long page);
 
+   boolean checkFullPolicy(Message message) throws Exception;
+
    /**
     * Use this method when you want to use the cache of used pages. If you are 
just using offline (e.g. print-data), use
     * the newPageObject method.
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 544b329e2e..4b63e0abc2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1424,66 +1424,50 @@ public class PagingStoreImpl implements PagingStore {
    }
 
    @Override
-   public int page(Message message,
-                       final Transaction tx,
-                       RouteContextList listCtx,
-                       Function<Message, Message> pageDecorator,
-                       boolean useFlowControl) throws Exception {
-
-      if (!running) {
-         return -1;
-      }
+   public boolean checkFullPolicy(Message message) throws Exception {
 
       boolean diskFull = pagingManager.isDiskFull();
 
-      if (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || 
diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
-         if (diskFull) {
-            if (message.isLargeMessage()) {
-               ((LargeServerMessage) message).deleteFile();
-            }
-
-            if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
-               throw 
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
-            }
+      if (diskFull && (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || 
diskFullMessagePolicy == DiskFullMessagePolicy.FAIL)) {
+         if (message.isLargeMessage()) {
+            ((LargeServerMessage) message).deleteFile();
+         }
 
-            // Dist is full, just drop the data
-            if (!printedDropMessagesWarning) {
-               printedDropMessagesWarning = true;
-               ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, 
getPageInfo());
-            }
+         if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
+            throw 
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+         }
 
-            return 0;
+         // Dist is full, just drop the data
+         if (!printedDropMessagesWarning) {
+            printedDropMessagesWarning = true;
+            ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, 
getPageInfo());
          }
-      }
 
-      boolean full = isFull();
+         return false;
+      }
 
-      if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || 
addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-         if (full) {
+      if (isFull()) {
+         if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
             if (message.isLargeMessage()) {
-               ((LargeServerMessage) message).deleteFile();
+               removeLargeMessage(message);
             }
-
-            if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-               throw 
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+            throw 
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+         } else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) 
{
+            if (message.isLargeMessage()) {
+               removeLargeMessage(message);
             }
-
-            // Address is full, we just pretend we are paging, and drop the 
data
+            // storage is full, just drop the data
             if (!printedDropMessagesWarning) {
                printedDropMessagesWarning = true;
                ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, 
getPageInfo());
             }
-            return 0;
-         } else {
-            return -1;
+            return false;
          }
-      } else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-         return -1;
       }
 
       if (pageFull) {
          if (message.isLargeMessage()) {
-            ((LargeServerMessage) message).deleteFile();
+            removeLargeMessage(message);
          }
 
          if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) {
@@ -1494,20 +1478,36 @@ public class PagingStoreImpl implements PagingStore {
             printedDropMessagesWarning = true;
             ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, 
getPageInfo());
          }
-
-         // we are in page mode, if we got to this point, we are dropping the 
message while still paging
-         // we return 0 as in the storage is in "page mode" however no credits 
are being taken.
-         return 0;
+         return false;
       }
 
-      return writePage(message, tx, listCtx, pageDecorator, useFlowControl);
+      return true;
+   }
+
+   private static void removeLargeMessage(Message message) {
+      try {
+         ((LargeServerMessage) message).deleteFile();
+      } catch (Exception e) {
+         // only thing to be done is log on this case
+         logger.debug("Error deleting large message file for {}", message, e);
+      }
    }
 
-   private int writePage(Message message,
-                             Transaction tx,
-                             RouteContextList listCtx,
-                             Function<Message, Message> pageDecorator,
-                             boolean useFlowControl) throws Exception {
+   @Override
+   public int page(Message message,
+                       final Transaction tx,
+                       RouteContextList listCtx,
+                       Function<Message, Message> pageDecorator,
+                       boolean useFlowControl) throws Exception {
+
+      if (!running) {
+         return -1;
+      }
+
+      if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
+         return -1;
+      }
+
       // We need to use a readLock as we need to keep paging until we 
scheduled a task
       // notice that to leave paging you need pending tasks done
       readLock();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 86275ac2ad..d263295a6a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1685,6 +1685,25 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       final SimpleString messageAddress = message.getAddressSimpleString();
       final PagingStore owningStore = 
pagingManager.getPageStore(messageAddress);
       message.setOwner(owningStore);
+      boolean dropMessages = false;
+      if (owningStore != null) {
+         if (!owningStore.checkFullPolicy(message)) {
+            dropMessages = true;
+         }
+      }
+      for (Map.Entry<SimpleString, RouteContextList> entry : 
context.getContexListing().entrySet()) {
+         final PagingStore store = entry.getValue().getAddressStore();
+         if (store != null) {
+            if (!store.checkFullPolicy(message)) {
+               dropMessages = true;
+            }
+         }
+      }
+
+      if (dropMessages) {
+         return;
+      }
+
       for (Map.Entry<SimpleString, RouteContextList> entry : 
context.getContexListing().entrySet()) {
          final PagingStore store;
          if (entry.getKey().equals(messageAddress)) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java
index b4f9bd761f..3d1c57cf14 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java
@@ -18,11 +18,15 @@ package org.apache.activemq.artemis.core.server;
 
 import java.util.List;
 
+import org.apache.activemq.artemis.core.paging.PagingStore;
+
 /**
  * This is a simple datatype containing the list of a routing context
  */
 public interface RouteContextList {
 
+   PagingStore getAddressStore();
+
    int getNumberOfNonDurableQueues();
 
    int getNumberOfDurableQueues();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index fe9c4f3f1f..c784f84138 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -78,11 +79,11 @@ public interface RoutingContext {
 
    Map<SimpleString, RouteContextList> getContexListing();
 
-   RouteContextList getContextListing(SimpleString address);
+   RouteContextList getContextListing(SimpleString address, PagingStore 
addressStore);
 
-   List<Queue> getNonDurableQueues(SimpleString address);
+   List<Queue> getNonDurableQueues(SimpleString address, PagingStore 
addressStore);
 
-   List<Queue> getDurableQueues(SimpleString address);
+   List<Queue> getDurableQueues(SimpleString address, PagingStore 
addressStore);
 
    int getQueueCount();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 6eda81f6cf..a65866c61e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -182,7 +182,7 @@ public class RemoteQueueBindingImpl implements 
RemoteQueueBinding {
    public void route(final Message message, final RoutingContext context) {
       addRouteContextToMessage(message);
 
-      List<Queue> durableQueuesOnContext = 
context.getDurableQueues(storeAndForwardQueue.getAddress());
+      List<Queue> durableQueuesOnContext = 
context.getDurableQueues(storeAndForwardQueue.getAddress(), 
storeAndForwardQueue.getPagingStore());
 
       if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
          // There can be many remote bindings for the same node, we only want 
to add the message once to
@@ -195,7 +195,7 @@ public class RemoteQueueBindingImpl implements 
RemoteQueueBinding {
    public void routeWithAck(Message message, RoutingContext context) {
       addRouteContextToMessage(message);
 
-      List<Queue> durableQueuesOnContext = 
context.getDurableQueues(storeAndForwardQueue.getAddress());
+      List<Queue> durableQueuesOnContext = 
context.getDurableQueues(storeAndForwardQueue.getAddress(), 
storeAndForwardQueue.getPagingStore());
 
       if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
          // There can be many remote bindings for the same node, we only want 
to add the message once to
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 1f7a7a9dc6..55d8318d68 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -204,7 +205,7 @@ public class RoutingContextImpl implements RoutingContext {
    @Override
    public void addQueue(final SimpleString address, final Queue queue) {
 
-      RouteContextList listing = getContextListing(address);
+      RouteContextList listing = getContextListing(address, 
queue.getPagingStore());
 
       if (queue.isDurable()) {
          listing.getDurableQueues().add(queue);
@@ -257,7 +258,7 @@ public class RoutingContextImpl implements RoutingContext {
    @Override
    public void addQueueWithAck(SimpleString address, Queue queue) {
       addQueue(address, queue);
-      RouteContextList listing = getContextListing(address);
+      RouteContextList listing = getContextListing(address, 
queue.getPagingStore());
       listing.addAckedQueue(queue);
    }
 
@@ -316,10 +317,10 @@ public class RoutingContextImpl implements RoutingContext 
{
    }
 
    @Override
-   public RouteContextList getContextListing(SimpleString address) {
+   public RouteContextList getContextListing(SimpleString address, PagingStore 
addressStore) {
       RouteContextList listing = map.get(address);
       if (listing == null) {
-         listing = new ContextListing();
+         listing = new ContextListing(addressStore);
          map.put(address, listing);
       }
       return listing;
@@ -336,13 +337,13 @@ public class RoutingContextImpl implements RoutingContext 
{
    }
 
    @Override
-   public List<Queue> getNonDurableQueues(SimpleString address) {
-      return getContextListing(address).getNonDurableQueues();
+   public List<Queue> getNonDurableQueues(SimpleString address, PagingStore 
addressStore) {
+      return getContextListing(address, addressStore).getNonDurableQueues();
    }
 
    @Override
-   public List<Queue> getDurableQueues(SimpleString address) {
-      return getContextListing(address).getDurableQueues();
+   public List<Queue> getDurableQueues(SimpleString address, PagingStore 
addressStore) {
+      return getContextListing(address, addressStore).getDurableQueues();
    }
 
    @Override
@@ -368,6 +369,17 @@ public class RoutingContextImpl implements RoutingContext {
 
    public static class ContextListing implements RouteContextList {
 
+      public ContextListing(PagingStore addressStore) {
+         this.addressStore = addressStore;
+      }
+
+      @Override
+      public PagingStore getAddressStore() {
+         return addressStore;
+      }
+
+      private PagingStore addressStore;
+
       private final List<Queue> durableQueue = new ArrayList<>(1);
 
       private final List<Queue> nonDurableQueue = new ArrayList<>(1);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index dcae8935a2..bc5b8b99ff 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
 import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@@ -48,6 +49,7 @@ import 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -105,6 +107,54 @@ public class MQTT5Test extends MQTT5TestSupport {
       assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
    }
 
+   @Test
+   @Timeout(DEFAULT_TIMEOUT_SEC)
+   public void testMaxMessagesSameAddress() throws Exception {
+      testMaxMessages("a/b", "a/b", "a.#");
+   }
+
+   @Test
+   @Timeout(DEFAULT_TIMEOUT_SEC)
+   public void testMaxMessagesDifferentAddresses() throws Exception {
+      testMaxMessages("a/b", "a/#", "a.#");
+   }
+
+   private void testMaxMessages(final String publisherTopic, final String 
subscriptionTopic, final String addressSettingsMatch) throws MqttException {
+      final int MAX_SIZE_MESSAGES = 1;
+
+      // ensure too many messages will trigger a failure
+      server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new 
AddressSettings().setMaxSizeMessages(MAX_SIZE_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL));
+
+      // ensure that the subscription should get the proper max size messages
+      assertEquals(MAX_SIZE_MESSAGES, 
server.getAddressSettingsRepository().getMatch(MQTTUtil.getCoreAddressFromMqttTopic(subscriptionTopic,
 WildcardConfiguration.DEFAULT_WILDCARD_CONFIGURATION)).getMaxSizeMessages());
+
+      // create and disconnect subscriber and ensure it leaves behind a 
subscription queue on the address
+      MqttClient subscriber = createPahoClient("subscriber");
+      MqttConnectionOptions subscriberOptions = new 
MqttConnectionOptionsBuilder()
+         .cleanStart(false)
+         .sessionExpiryInterval(999L)
+         .build();
+      subscriber.connect(subscriberOptions);
+      subscriber.subscribe(subscriptionTopic, AT_LEAST_ONCE);
+      subscriber.disconnect();
+      assertNotNull(getSubscriptionQueue(subscriptionTopic, "subscriber"));
+
+      // send messages and ensure the max-size-messages is enforced
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      for (int i = 0; i < MAX_SIZE_MESSAGES; i++) {
+         producer.publish(publisherTopic, RandomUtil.randomBytes(), 1, false);
+      }
+      try {
+         producer.publish(publisherTopic, RandomUtil.randomBytes(), 1, false);
+         fail("Should have failed to publish");
+      } catch (MqttException e) {
+         e.printStackTrace();
+         // ignore
+      }
+      assertEquals(MAX_SIZE_MESSAGES, getSubscriptionQueue(subscriptionTopic, 
"subscriber").getMessageCount());
+   }
+
    @Test
    @Timeout(DEFAULT_TIMEOUT_SEC)
    public void testSimpleRetroSendReceive() throws Exception {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java
new file mode 100644
index 0000000000..ca06754902
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class WildcardAddressFullTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   protected ActiveMQServer server;
+   protected ClientSession session;
+   protected ClientSessionFactory sf;
+   protected ServerLocator locator;
+
+   final String addressToSend = "a.b.c.d.e.f.g";
+   final String[] queueToReceive = new String[]{"a.b.c.d.e.f.*", 
"a.b.c.d.e.*.*", "a.b.c.d.*.*.*", "a.b.c.*.*.*.*", "a.b.*.*.*.*.*", 
"a.*.*.*.*.*.*"};
+   final String addressSettingsMatch = "a.#";
+
+   @Override
+   @BeforeEach
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, createDefaultNettyConfig());
+      server.start();
+      locator = createInVMNonHALocator();
+      sf = createSessionFactory(locator);
+      session = addClientSession(sf.createSession(false, true, true));
+   }
+
+   @Test
+   public void testFail() throws Exception {
+
+      final int MAX_MESSAGES = 1;
+      server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new 
AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+
+      session.createAddress(SimpleString.of(addressToSend), 
RoutingType.MULTICAST, false);
+      for (String q : queueToReceive) {
+         
session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST));
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createTopic(addressToSend));
+         for (int i = 0; i < MAX_MESSAGES; i++) {
+            producer.send(session.createTextMessage("will send"));
+         }
+         try {
+            producer.send(session.createTextMessage("should fail"));
+            fail("should fail");
+         } catch (Exception e) {
+            e.printStackTrace();
+         }
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         connection.start();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         for (String q : queueToReceive) {
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(q + "::" + q));
+            for (int i = 0; i < MAX_MESSAGES; i++) {
+               assertNotNull(consumer.receive(5000));
+            }
+         }
+      }
+
+      PagingStore store = 
server.getPagingManager().getPageStore(SimpleString.of(addressToSend));
+      assertEquals(0L, store.getAddressElements());
+      assertEquals(0, store.getAddressSize());
+
+      for (String q : queueToReceive) {
+         store = server.getPagingManager().getPageStore(SimpleString.of(q));
+         assertEquals(0L, store.getAddressElements());
+         assertEquals(0, store.getAddressSize());
+      }
+   }
+
+   @Test
+   public void testPaging() throws Exception {
+      int producerSend = 10;
+      final int MAX_MESSAGES = 1;
+      server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new 
AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+
+      session.createAddress(SimpleString.of(addressToSend), 
RoutingType.MULTICAST, false);
+      for (String q : queueToReceive) {
+         
session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST));
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createTopic(addressToSend));
+         for (int i = 0; i < producerSend; i++) {
+            producer.send(session.createTextMessage("will send"));
+         }
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         connection.start();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         for (String q : queueToReceive) {
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(q + "::" + q));
+            for (int i = 0; i < producerSend; i++) {
+               assertNotNull(consumer.receive(5000));
+            }
+            assertNull(consumer.receiveNoWait());
+         }
+      }
+
+      PagingStore store = 
server.getPagingManager().getPageStore(SimpleString.of(addressToSend));
+      assertEquals(0L, store.getAddressElements());
+      assertEquals(0, store.getAddressSize());
+
+      for (String q : queueToReceive) {
+         store = server.getPagingManager().getPageStore(SimpleString.of(q));
+         assertEquals(0L, store.getAddressElements());
+         assertEquals(0, store.getAddressSize());
+      }
+   }
+
+   @Test
+   public void testDrop() throws Exception {
+      int producerSend = 10;
+      final int MAX_MESSAGES = 1;
+      server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new 
AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+
+      session.createAddress(SimpleString.of(addressToSend), 
RoutingType.MULTICAST, false);
+      for (String q : queueToReceive) {
+         
session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST));
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createTopic(addressToSend));
+         for (int i = 0; i < producerSend; i++) {
+            producer.send(session.createTextMessage("will send"));
+         }
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         connection.start();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         for (String q : queueToReceive) {
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(q + "::" + q));
+            for (int i = 0; i < MAX_MESSAGES; i++) {
+               assertNotNull(consumer.receive(5000));
+            }
+            assertNull(consumer.receiveNoWait());
+         }
+      }
+
+      PagingStore store = 
server.getPagingManager().getPageStore(SimpleString.of(addressToSend));
+      assertEquals(0L, store.getAddressElements());
+      assertEquals(0, store.getAddressSize());
+
+      for (String q : queueToReceive) {
+         store = server.getPagingManager().getPageStore(SimpleString.of(q));
+         assertEquals(0L, store.getAddressElements());
+         assertEquals(0, store.getAddressSize());
+      }
+   }
+
+   @Test
+   public void testBlock() throws Exception {
+      String addressToSend = "a.b.c.d.e.f.g";
+      String[]  queueToReceive = new String[]{"a.b.c.d.e.f.*", 
"a.b.c.d.e.*.*", "a.b.c.d.*.*.*", "a.b.c.*.*.*.*", "a.b.*.*.*.*.*", 
"a.*.*.*.*.*.*"};
+      String addressSettingsMatch = "a.#";
+
+      ExecutorService executorService = Executors.newFixedThreadPool(1 + 
queueToReceive.length);
+      runAfter(executorService::shutdownNow);
+      final int NUMBER_OF_MESSAGES = 100;
+      final int MAX_MESSAGES = 1;
+      server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new 
AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+      CountDownLatch doneSending = new CountDownLatch(1);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      executorService.execute(() -> {
+
+         try {
+            ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+
+            session.createAddress(SimpleString.of(addressToSend), 
RoutingType.MULTICAST, false);
+            for (String q : queueToReceive) {
+               
session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST));
+            }
+
+            try (Connection connection = factory.createConnection()) {
+               Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+               MessageProducer producer = 
session.createProducer(session.createTopic(addressToSend));
+               for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+                  logger.info("Sending message {}", i);
+                  producer.send(session.createTextMessage("a".repeat(10000)));
+               }
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            errors.incrementAndGet();
+         } finally {
+            doneSending.countDown();
+         }
+      });
+
+      assertFalse(doneSending.await(500, TimeUnit.MILLISECONDS));
+
+      CountDownLatch doneConsume = new CountDownLatch(queueToReceive.length);
+
+      for (String q : queueToReceive) {
+         final String consumerQueue = q;
+         executorService.execute(() -> {
+            ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+            try (Connection connection = factory.createConnection()) {
+               Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+               MessageConsumer consumer = 
session.createConsumer(session.createQueue(q + "::" + q));
+               connection.start();
+               for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+                  assertNotNull(consumer.receive(5000));
+                  logger.info("Consumed {} on queue {}", i, q);
+               }
+               assertNull(consumer.receiveNoWait());
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               doneConsume.countDown();
+            }
+         });
+      }
+
+      assertTrue(doneSending.await(5, TimeUnit.SECONDS));
+      assertTrue(doneConsume.await(5, TimeUnit.SECONDS));
+      assertEquals(0, errors.get());
+
+      PagingStore store = 
server.getPagingManager().getPageStore(SimpleString.of(addressToSend));
+      assertEquals(0L, store.getAddressElements());
+      assertEquals(0, store.getAddressSize());
+
+      for (String q : queueToReceive) {
+         store = server.getPagingManager().getPageStore(SimpleString.of(q));
+         assertEquals(0L, store.getAddressElements());
+         assertEquals(0, store.getAddressSize());
+      }
+   }
+
+}
diff --git 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 29a8ff9671..923b8a71ad 100644
--- 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -271,6 +271,11 @@ public class PersistMultiThreadTest extends 
ActiveMQTestBase {
 
       }
 
+      @Override
+      public boolean checkFullPolicy(Message message) throws Exception {
+         return true;
+      }
+
       @Override
       public Page usePage(long page, boolean createEntry, boolean createFile) {
          return null;
diff --git 
a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java
 
b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java
index fda378e755..b84e62bb80 100644
--- 
a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java
+++ 
b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java
@@ -364,7 +364,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
 
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-         assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS)));
+         assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS, pageStore)));
 
          PagedReference readMessage = iterator.next();
 
@@ -398,7 +398,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
 
             msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-            assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS)));
+            assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS, pageStore)));
          }
 
          PagedReference readMessage = iterator.next();
@@ -429,7 +429,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
 
             msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-            assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS)));
+            assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS, pageStore)));
          }
 
          PagedReference readMessage = iterator.next();
@@ -512,7 +512,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
 
                   msg.getBodyBuffer().writeBytes(buffer, 0, 
buffer.writerIndex());
 
-                  assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS)));
+                  assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS, pageStore)));
                }
 
                if (tx != null) {
@@ -728,7 +728,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
 
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-         assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS)));
+         assertTrue(pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS, pageStore)));
       }
 
       return pageStore.getNumberOfPages();
@@ -801,7 +801,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
          Message msg = new CoreMessage(storage.generateID(), 
buffer.writerIndex());
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
          msg.putIntProperty("key", i);
-         pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS));
+         pageStore.page(msg, ctx.getTransaction(), 
ctx.getContextListing(ADDRESS, pageStore));
       }
 
       return txImpl;
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
index cc8e11d66d..3d29de4999 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
@@ -311,7 +311,7 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       pageStore.start();
       pageStore.startPaging();
 
-      routeContextList = new RoutingContextImpl.ContextListing();
+      routeContextList = new 
RoutingContextImpl.ContextListing(Mockito.mock(PagingStoreImpl.class));
       Queue mockQueue = Mockito.mock(Queue.class);
       Mockito.when(mockQueue.getID()).thenReturn(1L);
       routeContextList.addAckedQueue(mockQueue);
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
index 384b37b5b6..506f363aa8 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
@@ -78,11 +78,11 @@ public class PagingManagerImplTest extends ActiveMQTestBase 
{
       ICoreMessage msg = createMessage(1L, SimpleString.of("simple-test"), 
createRandomBuffer(10));
 
       final RoutingContextImpl ctx = new RoutingContextImpl(null);
-      assertFalse(store.page(msg, ctx.getTransaction(), 
ctx.getContextListing(store.getStoreName())));
+      assertFalse(store.page(msg, ctx.getTransaction(), 
ctx.getContextListing(store.getStoreName(), store)));
 
       store.startPaging();
 
-      assertTrue(store.page(msg, ctx.getTransaction(), 
ctx.getContextListing(store.getStoreName())));
+      assertTrue(store.page(msg, ctx.getTransaction(), 
ctx.getContextListing(store.getStoreName(), store)));
       syncOperationContext();
 
       Page page = depageOnExecutor(store);
@@ -102,7 +102,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase 
{
       assertNull(depageOnExecutor(store));
 
       final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
-      assertFalse(store.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(store.getStoreName())));
+      assertFalse(store.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(store.getStoreName(), store)));
 
    }
 
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 78baaca17e..0a8d4fce17 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -185,7 +185,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       assertTrue(storeImpl.isPaging());
 
       final RoutingContextImpl ctx = new RoutingContextImpl(null);
-      assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName())));
+      assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
       syncOperationContext();
 
       assertEquals(1, storeImpl.getNumberOfPages());
@@ -227,7 +227,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
          Message msg = createMessage(i, storeImpl, destination, buffer);
          final RoutingContextImpl ctx = new RoutingContextImpl(null);
-         assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName())));
+         assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
          syncOperationContext();
       }
 
@@ -295,7 +295,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                msg.putIntProperty("page", page);
                final RoutingContextImpl ctx = new RoutingContextImpl(null);
                ctx.addQueue(fakeQueue.getName(), fakeQueue);
-               assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName())));
+               assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
                syncOperationContext();
                if (i > 0 && i % 10 == 0) {
                   storeImpl.forceAnotherPage(true);
@@ -413,7 +413,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                msg.putIntProperty("page", page);
                final RoutingContextImpl ctx = new RoutingContextImpl(null);
                ctx.addQueue(fakeQueue.getName(), fakeQueue);
-               assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName())));
+               assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
                syncOperationContext();
                if (i > 0 && i % 10 == 0) {
                   storeImpl.forceAnotherPage(true);
@@ -510,7 +510,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
          msg.putIntProperty("page", 1);
          final RoutingContextImpl ctx = new RoutingContextImpl(null);
          ctx.addQueue(fakeQueue.getName(), fakeQueue);
-         assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName())));
+         assertTrue(storeImpl.page(msg, ctx.getTransaction(), 
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
          syncOperationContext();
       }
 
@@ -567,7 +567,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
          Message msg = createMessage(i, store, destination, buffer);
 
          final RoutingContextImpl ctx = new RoutingContextImpl(null);
-         assertTrue(store.page(msg, ctx.getTransaction(), 
ctx.getContextListing(store.getStoreName())));
+         assertTrue(store.page(msg, ctx.getTransaction(), 
ctx.getContextListing(store.getStoreName(), store)));
          syncOperationContext();
       }
 
@@ -601,7 +601,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       Message msg = createMessage(1, store, destination, buffers.get(0));
 
       final RoutingContextImpl ctx = new RoutingContextImpl(null);
-      assertTrue(store.page(msg, ctx.getTransaction(), 
ctx.getContextListing(store.getStoreName())));
+      assertTrue(store.page(msg, ctx.getTransaction(), 
ctx.getContextListing(store.getStoreName(), store)));
       syncOperationContext();
 
       Page newPage = depageOnExecutor(store);
@@ -622,14 +622,14 @@ public class PagingStoreImplTest extends ActiveMQTestBase 
{
 
       {
          final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
-         assertFalse(store.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(store.getStoreName())));
+         assertFalse(store.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(store.getStoreName(), store)));
       }
 
       store.startPaging();
 
       {
          final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
-         assertTrue(store.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(store.getStoreName())));
+         assertTrue(store.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(store.getStoreName(), store)));
          syncOperationContext();
       }
 
@@ -736,7 +736,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
                   msg.putLongProperty("count", i);
 
                   final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
-                  store.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(store.getStoreName()));
+                  store.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(store.getStoreName(), store));
                }
                syncOperationContext();
             } catch (Throwable e) {
@@ -1165,7 +1165,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase 
{
       msg.putLongProperty("count", id);
 
       final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
-      storeImpl.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(storeImpl.getStoreName()));
+      storeImpl.page(msg, ctx2.getTransaction(), 
ctx2.getContextListing(storeImpl.getStoreName(), storeImpl));
       syncOperationContext();
    }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to