This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new d21c5c50ac ARTEMIS-6011 Wildcard routing / Address full or dropped is
not properly propagated
d21c5c50ac is described below
commit d21c5c50ac3ceda8cc7d36c72b045eee9f2e7011
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Apr 17 10:28:05 2026 -0400
ARTEMIS-6011 Wildcard routing / Address full or dropped is not properly
propagated
Co-authored with Justin Bertram
---
.../connect/mirror/AMQPMirrorControllerSource.java | 8 +
.../activemq/artemis/core/paging/PagingStore.java | 2 +
.../artemis/core/paging/impl/PagingStoreImpl.java | 100 +++----
.../core/postoffice/impl/PostOfficeImpl.java | 19 ++
.../artemis/core/server/RouteContextList.java | 4 +
.../artemis/core/server/RoutingContext.java | 7 +-
.../cluster/impl/RemoteQueueBindingImpl.java | 4 +-
.../core/server/impl/RoutingContextImpl.java | 28 +-
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 50 ++++
.../paging/WildcardAddressFullTest.java | 301 +++++++++++++++++++++
.../storage/PersistMultiThreadTest.java | 5 +
.../tests/stress/paging/PageCursorStressTest.java | 12 +-
.../core/paging/impl/PageTimedWriterUnitTest.java | 2 +-
.../core/paging/impl/PagingManagerImplTest.java | 6 +-
.../unit/core/paging/impl/PagingStoreImplTest.java | 22 +-
15 files changed, 486 insertions(+), 84 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index b9e018c6e4..ea1ff1cc86 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -27,6 +27,7 @@ import
org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@@ -787,12 +788,19 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
static class PagedRouteContext implements RouteContextList {
+ private final PagingStore addressStore;
private final List<Queue> durableQueues;
private final List<Queue> nonDurableQueues;
+ @Override
+ public PagingStore getAddressStore() {
+ return addressStore;
+ }
+
PagedRouteContext(Queue snfQueue) {
List<Queue> queues = new ArrayList<>(1);
queues.add(snfQueue);
+ this.addressStore = snfQueue.getPagingStore();
if (snfQueue.isDurable()) {
durableQueues = queues;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 55775c5b53..898034f7a8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -139,6 +139,8 @@ public interface PagingStore extends ActiveMQComponent,
RefCountMessageListener
Page usePage(long page);
+ boolean checkFullPolicy(Message message) throws Exception;
+
/**
* Use this method when you want to use the cache of used pages. If you are
just using offline (e.g. print-data), use
* the newPageObject method.
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 544b329e2e..4b63e0abc2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1424,66 +1424,50 @@ public class PagingStoreImpl implements PagingStore {
}
@Override
- public int page(Message message,
- final Transaction tx,
- RouteContextList listCtx,
- Function<Message, Message> pageDecorator,
- boolean useFlowControl) throws Exception {
-
- if (!running) {
- return -1;
- }
+ public boolean checkFullPolicy(Message message) throws Exception {
boolean diskFull = pagingManager.isDiskFull();
- if (diskFullMessagePolicy == DiskFullMessagePolicy.DROP ||
diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
- if (diskFull) {
- if (message.isLargeMessage()) {
- ((LargeServerMessage) message).deleteFile();
- }
-
- if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
- throw
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
- }
+ if (diskFull && (diskFullMessagePolicy == DiskFullMessagePolicy.DROP ||
diskFullMessagePolicy == DiskFullMessagePolicy.FAIL)) {
+ if (message.isLargeMessage()) {
+ ((LargeServerMessage) message).deleteFile();
+ }
- // Dist is full, just drop the data
- if (!printedDropMessagesWarning) {
- printedDropMessagesWarning = true;
- ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName,
getPageInfo());
- }
+ if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
+ throw
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+ }
- return 0;
+ // Dist is full, just drop the data
+ if (!printedDropMessagesWarning) {
+ printedDropMessagesWarning = true;
+ ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName,
getPageInfo());
}
- }
- boolean full = isFull();
+ return false;
+ }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP ||
addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
- if (full) {
+ if (isFull()) {
+ if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
if (message.isLargeMessage()) {
- ((LargeServerMessage) message).deleteFile();
+ removeLargeMessage(message);
}
-
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
- throw
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+ throw
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+ } else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP)
{
+ if (message.isLargeMessage()) {
+ removeLargeMessage(message);
}
-
- // Address is full, we just pretend we are paging, and drop the
data
+ // storage is full, just drop the data
if (!printedDropMessagesWarning) {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName,
getPageInfo());
}
- return 0;
- } else {
- return -1;
+ return false;
}
- } else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
- return -1;
}
if (pageFull) {
if (message.isLargeMessage()) {
- ((LargeServerMessage) message).deleteFile();
+ removeLargeMessage(message);
}
if (pageFullMessagePolicy == PageFullMessagePolicy.FAIL) {
@@ -1494,20 +1478,36 @@ public class PagingStoreImpl implements PagingStore {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName,
getPageInfo());
}
-
- // we are in page mode, if we got to this point, we are dropping the
message while still paging
- // we return 0 as in the storage is in "page mode" however no credits
are being taken.
- return 0;
+ return false;
}
- return writePage(message, tx, listCtx, pageDecorator, useFlowControl);
+ return true;
+ }
+
+ private static void removeLargeMessage(Message message) {
+ try {
+ ((LargeServerMessage) message).deleteFile();
+ } catch (Exception e) {
+ // only thing to be done is log on this case
+ logger.debug("Error deleting large message file for {}", message, e);
+ }
}
- private int writePage(Message message,
- Transaction tx,
- RouteContextList listCtx,
- Function<Message, Message> pageDecorator,
- boolean useFlowControl) throws Exception {
+ @Override
+ public int page(Message message,
+ final Transaction tx,
+ RouteContextList listCtx,
+ Function<Message, Message> pageDecorator,
+ boolean useFlowControl) throws Exception {
+
+ if (!running) {
+ return -1;
+ }
+
+ if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
+ return -1;
+ }
+
// We need to use a readLock as we need to keep paging until we
scheduled a task
// notice that to leave paging you need pending tasks done
readLock();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 86275ac2ad..d263295a6a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1685,6 +1685,25 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
final SimpleString messageAddress = message.getAddressSimpleString();
final PagingStore owningStore =
pagingManager.getPageStore(messageAddress);
message.setOwner(owningStore);
+ boolean dropMessages = false;
+ if (owningStore != null) {
+ if (!owningStore.checkFullPolicy(message)) {
+ dropMessages = true;
+ }
+ }
+ for (Map.Entry<SimpleString, RouteContextList> entry :
context.getContexListing().entrySet()) {
+ final PagingStore store = entry.getValue().getAddressStore();
+ if (store != null) {
+ if (!store.checkFullPolicy(message)) {
+ dropMessages = true;
+ }
+ }
+ }
+
+ if (dropMessages) {
+ return;
+ }
+
for (Map.Entry<SimpleString, RouteContextList> entry :
context.getContexListing().entrySet()) {
final PagingStore store;
if (entry.getKey().equals(messageAddress)) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java
index b4f9bd761f..3d1c57cf14 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RouteContextList.java
@@ -18,11 +18,15 @@ package org.apache.activemq.artemis.core.server;
import java.util.List;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+
/**
* This is a simple datatype containing the list of a routing context
*/
public interface RouteContextList {
+ PagingStore getAddressStore();
+
int getNumberOfNonDurableQueues();
int getNumberOfDurableQueues();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index fe9c4f3f1f..c784f84138 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -78,11 +79,11 @@ public interface RoutingContext {
Map<SimpleString, RouteContextList> getContexListing();
- RouteContextList getContextListing(SimpleString address);
+ RouteContextList getContextListing(SimpleString address, PagingStore
addressStore);
- List<Queue> getNonDurableQueues(SimpleString address);
+ List<Queue> getNonDurableQueues(SimpleString address, PagingStore
addressStore);
- List<Queue> getDurableQueues(SimpleString address);
+ List<Queue> getDurableQueues(SimpleString address, PagingStore
addressStore);
int getQueueCount();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 6eda81f6cf..a65866c61e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -182,7 +182,7 @@ public class RemoteQueueBindingImpl implements
RemoteQueueBinding {
public void route(final Message message, final RoutingContext context) {
addRouteContextToMessage(message);
- List<Queue> durableQueuesOnContext =
context.getDurableQueues(storeAndForwardQueue.getAddress());
+ List<Queue> durableQueuesOnContext =
context.getDurableQueues(storeAndForwardQueue.getAddress(),
storeAndForwardQueue.getPagingStore());
if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
// There can be many remote bindings for the same node, we only want
to add the message once to
@@ -195,7 +195,7 @@ public class RemoteQueueBindingImpl implements
RemoteQueueBinding {
public void routeWithAck(Message message, RoutingContext context) {
addRouteContextToMessage(message);
- List<Queue> durableQueuesOnContext =
context.getDurableQueues(storeAndForwardQueue.getAddress());
+ List<Queue> durableQueuesOnContext =
context.getDurableQueues(storeAndForwardQueue.getAddress(),
storeAndForwardQueue.getPagingStore());
if (!durableQueuesOnContext.contains(storeAndForwardQueue)) {
// There can be many remote bindings for the same node, we only want
to add the message once to
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 1f7a7a9dc6..55d8318d68 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -204,7 +205,7 @@ public class RoutingContextImpl implements RoutingContext {
@Override
public void addQueue(final SimpleString address, final Queue queue) {
- RouteContextList listing = getContextListing(address);
+ RouteContextList listing = getContextListing(address,
queue.getPagingStore());
if (queue.isDurable()) {
listing.getDurableQueues().add(queue);
@@ -257,7 +258,7 @@ public class RoutingContextImpl implements RoutingContext {
@Override
public void addQueueWithAck(SimpleString address, Queue queue) {
addQueue(address, queue);
- RouteContextList listing = getContextListing(address);
+ RouteContextList listing = getContextListing(address,
queue.getPagingStore());
listing.addAckedQueue(queue);
}
@@ -316,10 +317,10 @@ public class RoutingContextImpl implements RoutingContext
{
}
@Override
- public RouteContextList getContextListing(SimpleString address) {
+ public RouteContextList getContextListing(SimpleString address, PagingStore
addressStore) {
RouteContextList listing = map.get(address);
if (listing == null) {
- listing = new ContextListing();
+ listing = new ContextListing(addressStore);
map.put(address, listing);
}
return listing;
@@ -336,13 +337,13 @@ public class RoutingContextImpl implements RoutingContext
{
}
@Override
- public List<Queue> getNonDurableQueues(SimpleString address) {
- return getContextListing(address).getNonDurableQueues();
+ public List<Queue> getNonDurableQueues(SimpleString address, PagingStore
addressStore) {
+ return getContextListing(address, addressStore).getNonDurableQueues();
}
@Override
- public List<Queue> getDurableQueues(SimpleString address) {
- return getContextListing(address).getDurableQueues();
+ public List<Queue> getDurableQueues(SimpleString address, PagingStore
addressStore) {
+ return getContextListing(address, addressStore).getDurableQueues();
}
@Override
@@ -368,6 +369,17 @@ public class RoutingContextImpl implements RoutingContext {
public static class ContextListing implements RouteContextList {
+ public ContextListing(PagingStore addressStore) {
+ this.addressStore = addressStore;
+ }
+
+ @Override
+ public PagingStore getAddressStore() {
+ return addressStore;
+ }
+
+ private PagingStore addressStore;
+
private final List<Queue> durableQueue = new ArrayList<>(1);
private final List<Queue> nonDurableQueue = new ArrayList<>(1);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index dcae8935a2..bc5b8b99ff 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
@@ -48,6 +49,7 @@ import
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerSession;
import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.utils.RandomUtil;
@@ -105,6 +107,54 @@ public class MQTT5Test extends MQTT5TestSupport {
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}
+ @Test
+ @Timeout(DEFAULT_TIMEOUT_SEC)
+ public void testMaxMessagesSameAddress() throws Exception {
+ testMaxMessages("a/b", "a/b", "a.#");
+ }
+
+ @Test
+ @Timeout(DEFAULT_TIMEOUT_SEC)
+ public void testMaxMessagesDifferentAddresses() throws Exception {
+ testMaxMessages("a/b", "a/#", "a.#");
+ }
+
+ private void testMaxMessages(final String publisherTopic, final String
subscriptionTopic, final String addressSettingsMatch) throws MqttException {
+ final int MAX_SIZE_MESSAGES = 1;
+
+ // ensure too many messages will trigger a failure
+ server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new
AddressSettings().setMaxSizeMessages(MAX_SIZE_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL));
+
+ // ensure that the subscription should get the proper max size messages
+ assertEquals(MAX_SIZE_MESSAGES,
server.getAddressSettingsRepository().getMatch(MQTTUtil.getCoreAddressFromMqttTopic(subscriptionTopic,
WildcardConfiguration.DEFAULT_WILDCARD_CONFIGURATION)).getMaxSizeMessages());
+
+ // create and disconnect subscriber and ensure it leaves behind a
subscription queue on the address
+ MqttClient subscriber = createPahoClient("subscriber");
+ MqttConnectionOptions subscriberOptions = new
MqttConnectionOptionsBuilder()
+ .cleanStart(false)
+ .sessionExpiryInterval(999L)
+ .build();
+ subscriber.connect(subscriberOptions);
+ subscriber.subscribe(subscriptionTopic, AT_LEAST_ONCE);
+ subscriber.disconnect();
+ assertNotNull(getSubscriptionQueue(subscriptionTopic, "subscriber"));
+
+ // send messages and ensure the max-size-messages is enforced
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ for (int i = 0; i < MAX_SIZE_MESSAGES; i++) {
+ producer.publish(publisherTopic, RandomUtil.randomBytes(), 1, false);
+ }
+ try {
+ producer.publish(publisherTopic, RandomUtil.randomBytes(), 1, false);
+ fail("Should have failed to publish");
+ } catch (MqttException e) {
+ e.printStackTrace();
+ // ignore
+ }
+ assertEquals(MAX_SIZE_MESSAGES, getSubscriptionQueue(subscriptionTopic,
"subscriber").getMessageCount());
+ }
+
@Test
@Timeout(DEFAULT_TIMEOUT_SEC)
public void testSimpleRetroSendReceive() throws Exception {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java
new file mode 100644
index 0000000000..ca06754902
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/WildcardAddressFullTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class WildcardAddressFullTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected ActiveMQServer server;
+ protected ClientSession session;
+ protected ClientSessionFactory sf;
+ protected ServerLocator locator;
+
+ final String addressToSend = "a.b.c.d.e.f.g";
+ final String[] queueToReceive = new String[]{"a.b.c.d.e.f.*",
"a.b.c.d.e.*.*", "a.b.c.d.*.*.*", "a.b.c.*.*.*.*", "a.b.*.*.*.*.*",
"a.*.*.*.*.*.*"};
+ final String addressSettingsMatch = "a.#";
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, createDefaultNettyConfig());
+ server.start();
+ locator = createInVMNonHALocator();
+ sf = createSessionFactory(locator);
+ session = addClientSession(sf.createSession(false, true, true));
+ }
+
+ @Test
+ public void testFail() throws Exception {
+
+ final int MAX_MESSAGES = 1;
+ server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new
AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+
+ session.createAddress(SimpleString.of(addressToSend),
RoutingType.MULTICAST, false);
+ for (String q : queueToReceive) {
+
session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST));
+ }
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createTopic(addressToSend));
+ for (int i = 0; i < MAX_MESSAGES; i++) {
+ producer.send(session.createTextMessage("will send"));
+ }
+ try {
+ producer.send(session.createTextMessage("should fail"));
+ fail("should fail");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ try (Connection connection = factory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ for (String q : queueToReceive) {
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(q + "::" + q));
+ for (int i = 0; i < MAX_MESSAGES; i++) {
+ assertNotNull(consumer.receive(5000));
+ }
+ }
+ }
+
+ PagingStore store =
server.getPagingManager().getPageStore(SimpleString.of(addressToSend));
+ assertEquals(0L, store.getAddressElements());
+ assertEquals(0, store.getAddressSize());
+
+ for (String q : queueToReceive) {
+ store = server.getPagingManager().getPageStore(SimpleString.of(q));
+ assertEquals(0L, store.getAddressElements());
+ assertEquals(0, store.getAddressSize());
+ }
+ }
+
+ @Test
+ public void testPaging() throws Exception {
+ int producerSend = 10;
+ final int MAX_MESSAGES = 1;
+ server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new
AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+
+ session.createAddress(SimpleString.of(addressToSend),
RoutingType.MULTICAST, false);
+ for (String q : queueToReceive) {
+
session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST));
+ }
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createTopic(addressToSend));
+ for (int i = 0; i < producerSend; i++) {
+ producer.send(session.createTextMessage("will send"));
+ }
+ }
+
+ try (Connection connection = factory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ for (String q : queueToReceive) {
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(q + "::" + q));
+ for (int i = 0; i < producerSend; i++) {
+ assertNotNull(consumer.receive(5000));
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+ }
+
+ PagingStore store =
server.getPagingManager().getPageStore(SimpleString.of(addressToSend));
+ assertEquals(0L, store.getAddressElements());
+ assertEquals(0, store.getAddressSize());
+
+ for (String q : queueToReceive) {
+ store = server.getPagingManager().getPageStore(SimpleString.of(q));
+ assertEquals(0L, store.getAddressElements());
+ assertEquals(0, store.getAddressSize());
+ }
+ }
+
+ @Test
+ public void testDrop() throws Exception {
+ int producerSend = 10;
+ final int MAX_MESSAGES = 1;
+ server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new
AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+
+ session.createAddress(SimpleString.of(addressToSend),
RoutingType.MULTICAST, false);
+ for (String q : queueToReceive) {
+
session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST));
+ }
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createTopic(addressToSend));
+ for (int i = 0; i < producerSend; i++) {
+ producer.send(session.createTextMessage("will send"));
+ }
+ }
+
+ try (Connection connection = factory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ for (String q : queueToReceive) {
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(q + "::" + q));
+ for (int i = 0; i < MAX_MESSAGES; i++) {
+ assertNotNull(consumer.receive(5000));
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+ }
+
+ PagingStore store =
server.getPagingManager().getPageStore(SimpleString.of(addressToSend));
+ assertEquals(0L, store.getAddressElements());
+ assertEquals(0, store.getAddressSize());
+
+ for (String q : queueToReceive) {
+ store = server.getPagingManager().getPageStore(SimpleString.of(q));
+ assertEquals(0L, store.getAddressElements());
+ assertEquals(0, store.getAddressSize());
+ }
+ }
+
+ @Test
+ public void testBlock() throws Exception {
+ String addressToSend = "a.b.c.d.e.f.g";
+ String[] queueToReceive = new String[]{"a.b.c.d.e.f.*",
"a.b.c.d.e.*.*", "a.b.c.d.*.*.*", "a.b.c.*.*.*.*", "a.b.*.*.*.*.*",
"a.*.*.*.*.*.*"};
+ String addressSettingsMatch = "a.#";
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1 +
queueToReceive.length);
+ runAfter(executorService::shutdownNow);
+ final int NUMBER_OF_MESSAGES = 100;
+ final int MAX_MESSAGES = 1;
+ server.getAddressSettingsRepository().addMatch(addressSettingsMatch, new
AddressSettings().setMaxSizeMessages(MAX_MESSAGES).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+ CountDownLatch doneSending = new CountDownLatch(1);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ executorService.execute(() -> {
+
+ try {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+
+ session.createAddress(SimpleString.of(addressToSend),
RoutingType.MULTICAST, false);
+ for (String q : queueToReceive) {
+
session.createQueue(QueueConfiguration.of(q).setRoutingType(RoutingType.MULTICAST));
+ }
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createTopic(addressToSend));
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ logger.info("Sending message {}", i);
+ producer.send(session.createTextMessage("a".repeat(10000)));
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ doneSending.countDown();
+ }
+ });
+
+ assertFalse(doneSending.await(500, TimeUnit.MILLISECONDS));
+
+ CountDownLatch doneConsume = new CountDownLatch(queueToReceive.length);
+
+ for (String q : queueToReceive) {
+ final String consumerQueue = q;
+ executorService.execute(() -> {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(q + "::" + q));
+ connection.start();
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ assertNotNull(consumer.receive(5000));
+ logger.info("Consumed {} on queue {}", i, q);
+ }
+ assertNull(consumer.receiveNoWait());
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ doneConsume.countDown();
+ }
+ });
+ }
+
+ assertTrue(doneSending.await(5, TimeUnit.SECONDS));
+ assertTrue(doneConsume.await(5, TimeUnit.SECONDS));
+ assertEquals(0, errors.get());
+
+ PagingStore store =
server.getPagingManager().getPageStore(SimpleString.of(addressToSend));
+ assertEquals(0L, store.getAddressElements());
+ assertEquals(0, store.getAddressSize());
+
+ for (String q : queueToReceive) {
+ store = server.getPagingManager().getPageStore(SimpleString.of(q));
+ assertEquals(0L, store.getAddressElements());
+ assertEquals(0, store.getAddressSize());
+ }
+ }
+
+}
diff --git
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 29a8ff9671..923b8a71ad 100644
---
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -271,6 +271,11 @@ public class PersistMultiThreadTest extends
ActiveMQTestBase {
}
+ @Override
+ public boolean checkFullPolicy(Message message) throws Exception {
+ return true;
+ }
+
@Override
public Page usePage(long page, boolean createEntry, boolean createFile) {
return null;
diff --git
a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java
b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java
index fda378e755..b84e62bb80 100644
---
a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java
+++
b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/paging/PageCursorStressTest.java
@@ -364,7 +364,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS)));
+ assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS, pageStore)));
PagedReference readMessage = iterator.next();
@@ -398,7 +398,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS)));
+ assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS, pageStore)));
}
PagedReference readMessage = iterator.next();
@@ -429,7 +429,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS)));
+ assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS, pageStore)));
}
PagedReference readMessage = iterator.next();
@@ -512,7 +512,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0,
buffer.writerIndex());
- assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS)));
+ assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS, pageStore)));
}
if (tx != null) {
@@ -728,7 +728,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
- assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS)));
+ assertTrue(pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS, pageStore)));
}
return pageStore.getNumberOfPages();
@@ -801,7 +801,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
Message msg = new CoreMessage(storage.generateID(),
buffer.writerIndex());
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
msg.putIntProperty("key", i);
- pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS));
+ pageStore.page(msg, ctx.getTransaction(),
ctx.getContextListing(ADDRESS, pageStore));
}
return txImpl;
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
index cc8e11d66d..3d29de4999 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
@@ -311,7 +311,7 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
pageStore.start();
pageStore.startPaging();
- routeContextList = new RoutingContextImpl.ContextListing();
+ routeContextList = new
RoutingContextImpl.ContextListing(Mockito.mock(PagingStoreImpl.class));
Queue mockQueue = Mockito.mock(Queue.class);
Mockito.when(mockQueue.getID()).thenReturn(1L);
routeContextList.addAckedQueue(mockQueue);
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
index 384b37b5b6..506f363aa8 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingManagerImplTest.java
@@ -78,11 +78,11 @@ public class PagingManagerImplTest extends ActiveMQTestBase
{
ICoreMessage msg = createMessage(1L, SimpleString.of("simple-test"),
createRandomBuffer(10));
final RoutingContextImpl ctx = new RoutingContextImpl(null);
- assertFalse(store.page(msg, ctx.getTransaction(),
ctx.getContextListing(store.getStoreName())));
+ assertFalse(store.page(msg, ctx.getTransaction(),
ctx.getContextListing(store.getStoreName(), store)));
store.startPaging();
- assertTrue(store.page(msg, ctx.getTransaction(),
ctx.getContextListing(store.getStoreName())));
+ assertTrue(store.page(msg, ctx.getTransaction(),
ctx.getContextListing(store.getStoreName(), store)));
syncOperationContext();
Page page = depageOnExecutor(store);
@@ -102,7 +102,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase
{
assertNull(depageOnExecutor(store));
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
- assertFalse(store.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(store.getStoreName())));
+ assertFalse(store.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(store.getStoreName(), store)));
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 78baaca17e..0a8d4fce17 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -185,7 +185,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
assertTrue(storeImpl.isPaging());
final RoutingContextImpl ctx = new RoutingContextImpl(null);
- assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName())));
+ assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
syncOperationContext();
assertEquals(1, storeImpl.getNumberOfPages());
@@ -227,7 +227,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
Message msg = createMessage(i, storeImpl, destination, buffer);
final RoutingContextImpl ctx = new RoutingContextImpl(null);
- assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName())));
+ assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
syncOperationContext();
}
@@ -295,7 +295,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
msg.putIntProperty("page", page);
final RoutingContextImpl ctx = new RoutingContextImpl(null);
ctx.addQueue(fakeQueue.getName(), fakeQueue);
- assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName())));
+ assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
syncOperationContext();
if (i > 0 && i % 10 == 0) {
storeImpl.forceAnotherPage(true);
@@ -413,7 +413,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
msg.putIntProperty("page", page);
final RoutingContextImpl ctx = new RoutingContextImpl(null);
ctx.addQueue(fakeQueue.getName(), fakeQueue);
- assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName())));
+ assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
syncOperationContext();
if (i > 0 && i % 10 == 0) {
storeImpl.forceAnotherPage(true);
@@ -510,7 +510,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
msg.putIntProperty("page", 1);
final RoutingContextImpl ctx = new RoutingContextImpl(null);
ctx.addQueue(fakeQueue.getName(), fakeQueue);
- assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName())));
+ assertTrue(storeImpl.page(msg, ctx.getTransaction(),
ctx.getContextListing(storeImpl.getStoreName(), storeImpl)));
syncOperationContext();
}
@@ -567,7 +567,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
Message msg = createMessage(i, store, destination, buffer);
final RoutingContextImpl ctx = new RoutingContextImpl(null);
- assertTrue(store.page(msg, ctx.getTransaction(),
ctx.getContextListing(store.getStoreName())));
+ assertTrue(store.page(msg, ctx.getTransaction(),
ctx.getContextListing(store.getStoreName(), store)));
syncOperationContext();
}
@@ -601,7 +601,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
Message msg = createMessage(1, store, destination, buffers.get(0));
final RoutingContextImpl ctx = new RoutingContextImpl(null);
- assertTrue(store.page(msg, ctx.getTransaction(),
ctx.getContextListing(store.getStoreName())));
+ assertTrue(store.page(msg, ctx.getTransaction(),
ctx.getContextListing(store.getStoreName(), store)));
syncOperationContext();
Page newPage = depageOnExecutor(store);
@@ -622,14 +622,14 @@ public class PagingStoreImplTest extends ActiveMQTestBase
{
{
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
- assertFalse(store.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(store.getStoreName())));
+ assertFalse(store.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(store.getStoreName(), store)));
}
store.startPaging();
{
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
- assertTrue(store.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(store.getStoreName())));
+ assertTrue(store.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(store.getStoreName(), store)));
syncOperationContext();
}
@@ -736,7 +736,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
msg.putLongProperty("count", i);
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
- store.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(store.getStoreName()));
+ store.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(store.getStoreName(), store));
}
syncOperationContext();
} catch (Throwable e) {
@@ -1165,7 +1165,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase
{
msg.putLongProperty("count", id);
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
- storeImpl.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(storeImpl.getStoreName()));
+ storeImpl.page(msg, ctx2.getTransaction(),
ctx2.getContextListing(storeImpl.getStoreName(), storeImpl));
syncOperationContext();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]