gemmellr commented on code in PR #5128:
URL: https://github.com/apache/activemq-artemis/pull/5128#discussion_r1718212763
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
Review Comment:
Since at no point has it been validated these getters are returning the
values we expect, this isnt strictly checking its working.
We should either use the initial PageSizeBytes and PageLimitBytes values we
provided in the settings to calculate the expected pages limit, or else check
that the getters are returning those values (or do both) as e.g the previous
test did for PageLimitBytes.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
Review Comment:
procuder -> producer
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ // and messages still below limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(bigLimitMessages,
queuePagingStore.getPageLimitMessages());
+
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider)
< queuePagingStore.getPageLimitMessages());
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws
Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ // the numbers should make sure page files reach the pageLimitBytes
before pageLimitMessages
+ final int size = 1024 * 50;
+ final Long maxMessages = 200L;
+
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 5));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the pages reach the limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+
+ assertTrue(existingPages > maxPages);
+
+ // but messages still under limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+ Long existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be less than max " + maxMessages);
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but messages still under limit
+ cursorProvider = queuePagingStore.getCursorProvider();
+ existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be still less than max " + maxMessages);
+
+ // now increase the pageLimitBytes to unblock procuder
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 20));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertTrue(queuePagingStore != null &&
!queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // current pages not exceeds the max pages
+ Long currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
Review Comment:
Again can verify against pre-known value, and/or verify the getters are
accurate.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ // and messages still below limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(bigLimitMessages,
queuePagingStore.getPageLimitMessages());
+
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider)
< queuePagingStore.getPageLimitMessages());
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws
Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ // the numbers should make sure page files reach the pageLimitBytes
before pageLimitMessages
+ final int size = 1024 * 50;
+ final Long maxMessages = 200L;
+
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 5));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the pages reach the limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+
+ assertTrue(existingPages > maxPages);
+
+ // but messages still under limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+ Long existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be less than max " + maxMessages);
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but messages still under limit
+ cursorProvider = queuePagingStore.getCursorProvider();
+ existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be still less than max " + maxMessages);
+
+ // now increase the pageLimitBytes to unblock procuder
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 20));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertTrue(queuePagingStore != null &&
!queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // current pages not exceeds the max pages
+ Long currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // however messages reaches limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(maxMessages, queuePagingStore.getPageLimitMessages());
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void testPageLimitBytesValidationOnRestart() throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1024 * 50;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 10));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 30;
+ int messageSize = 1024 * 10;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages > 4);
+
+ // restart the server with a smaller pageLimitSize < existing pages.
+ server.stop(true);
+ waitForServerToStop(server);
+
+ addressSettings.setPageLimitBytes((long) (size * 4));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // server will start regardless of current page count >
(pageLimitBytes / pageSizeBytes)
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ // verify current situation
+ queue = server.locateQueue(queueAddr);
+
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging()
&& queuePagingStore.isPageFull());
+
+ long currentPages = queuePagingStore.getNumberOfPages();
+ assertEquals(existingPages, currentPages);
+
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
Review Comment:
Again can verify against pre-known value, and/or verify the getters are
accurate.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -232,7 +233,7 @@ private void configureSizeMetric() {
* @param addressSettings
*/
@Override
- public void applySetting(final AddressSettings addressSettings) {
+ public void applySetting(final AddressSettings addressSettings, final
boolean firstTime) {
Review Comment:
Since it is really an implementation detail that the constructor calls the
applySetting method, I think this interface method should probably just go
back to the way it was, and a private internal second method be created with
the firstTime boolean that both this methods implementation and the constructor
can then call. That way the interface and none of the tests etc need to be
updated to support calling with false (and they shouldnt be able to call true)
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
Review Comment:
Similarly here too
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
Review Comment:
Similarly here, it should validate against a pre-known value and/or check
the getters are correct, rather than trusting that they are.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ // and messages still below limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(bigLimitMessages,
queuePagingStore.getPageLimitMessages());
+
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider)
< queuePagingStore.getPageLimitMessages());
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws
Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ // the numbers should make sure page files reach the pageLimitBytes
before pageLimitMessages
+ final int size = 1024 * 50;
+ final Long maxMessages = 200L;
+
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 5));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the pages reach the limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+
+ assertTrue(existingPages > maxPages);
+
+ // but messages still under limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+ Long existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be less than max " + maxMessages);
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but messages still under limit
+ cursorProvider = queuePagingStore.getCursorProvider();
+ existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be still less than max " + maxMessages);
+
+ // now increase the pageLimitBytes to unblock procuder
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 20));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertTrue(queuePagingStore != null &&
!queuePagingStore.isPageFull());
Review Comment:
queuePagingStore != null is superfluous her, since it already checked it
wasnt null, and havent changed the variable since (but have used it again
without checking it isnt null)
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ // and messages still below limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(bigLimitMessages,
queuePagingStore.getPageLimitMessages());
+
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider)
< queuePagingStore.getPageLimitMessages());
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws
Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ // the numbers should make sure page files reach the pageLimitBytes
before pageLimitMessages
+ final int size = 1024 * 50;
+ final Long maxMessages = 200L;
+
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 5));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the pages reach the limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+
+ assertTrue(existingPages > maxPages);
+
+ // but messages still under limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+ Long existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be less than max " + maxMessages);
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
Review Comment:
Would be clearer to use assertNotNull rather than the combined check
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ // and messages still below limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(bigLimitMessages,
queuePagingStore.getPageLimitMessages());
+
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider)
< queuePagingStore.getPageLimitMessages());
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws
Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ // the numbers should make sure page files reach the pageLimitBytes
before pageLimitMessages
+ final int size = 1024 * 50;
+ final Long maxMessages = 200L;
+
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 5));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the pages reach the limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+
+ assertTrue(existingPages > maxPages);
Review Comment:
Similarly here also
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ // and messages still below limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(bigLimitMessages,
queuePagingStore.getPageLimitMessages());
+
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider)
< queuePagingStore.getPageLimitMessages());
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws
Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ // the numbers should make sure page files reach the pageLimitBytes
before pageLimitMessages
+ final int size = 1024 * 50;
+ final Long maxMessages = 200L;
+
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 5));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the pages reach the limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+
+ assertTrue(existingPages > maxPages);
+
+ // but messages still under limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+ Long existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be less than max " + maxMessages);
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but messages still under limit
+ cursorProvider = queuePagingStore.getCursorProvider();
+ existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be still less than max " + maxMessages);
+
+ // now increase the pageLimitBytes to unblock procuder
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 20));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertTrue(queuePagingStore != null &&
!queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // current pages not exceeds the max pages
+ Long currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // however messages reaches limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(maxMessages, queuePagingStore.getPageLimitMessages());
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void testPageLimitBytesValidationOnRestart() throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1024 * 50;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 10));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 30;
+ int messageSize = 1024 * 10;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages > 4);
+
+ // restart the server with a smaller pageLimitSize < existing pages.
+ server.stop(true);
+ waitForServerToStop(server);
+
+ addressSettings.setPageLimitBytes((long) (size * 4));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // server will start regardless of current page count >
(pageLimitBytes / pageSizeBytes)
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ // verify current situation
+ queue = server.locateQueue(queueAddr);
+
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging()
&& queuePagingStore.isPageFull());
+
+ long currentPages = queuePagingStore.getNumberOfPages();
+ assertEquals(existingPages, currentPages);
+
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ //consume messages until current pages goes down to below maxPage
+ locator = createFactory(isNetty());
+ final int numMessages = 25;
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ session = csf.createSession(false, true);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(queueName);
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ session.commit();
+ }
+
+ currentPages = queuePagingStore.getNumberOfPages();
+ assertTrue(currentPages < maxPages);
+ // check page store not page full
+ assertTrue(queuePagingStore.isPaging() &&
!queuePagingStore.isPageFull());
+
+ //send messages one by one until page full
+ ClientProducer producer = session.createProducer(queueName);
+ boolean isFull = false;
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = createMessage(session,
messageSize, i, null);
+ producer.send(message);
+ session.commit();
+ }
+ } catch (ActiveMQAddressFullException e) {
+ isFull = true;
+ session.close();
Review Comment:
session close should move after the try-catch as we always want to do it,
and it would otherwise fail to happen if the loop somehow succeeded.
EDIT: actually, since the wider try-catch with the ClientSessionFactory csf
is there, that means it always closes right...so can this just be omitted
rather than look wrong?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ // and messages still below limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(bigLimitMessages,
queuePagingStore.getPageLimitMessages());
+
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider)
< queuePagingStore.getPageLimitMessages());
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws
Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ // the numbers should make sure page files reach the pageLimitBytes
before pageLimitMessages
+ final int size = 1024 * 50;
+ final Long maxMessages = 200L;
+
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 5));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the pages reach the limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+
+ assertTrue(existingPages > maxPages);
+
+ // but messages still under limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+ Long existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be less than max " + maxMessages);
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but messages still under limit
+ cursorProvider = queuePagingStore.getCursorProvider();
+ existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be still less than max " + maxMessages);
+
+ // now increase the pageLimitBytes to unblock procuder
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 20));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertTrue(queuePagingStore != null &&
!queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // current pages not exceeds the max pages
+ Long currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // however messages reaches limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(maxMessages, queuePagingStore.getPageLimitMessages());
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void testPageLimitBytesValidationOnRestart() throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1024 * 50;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 10));
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 30;
+ int messageSize = 1024 * 10;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages > 4);
+
+ // restart the server with a smaller pageLimitSize < existing pages.
+ server.stop(true);
+ waitForServerToStop(server);
+
+ addressSettings.setPageLimitBytes((long) (size * 4));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // server will start regardless of current page count >
(pageLimitBytes / pageSizeBytes)
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ // verify current situation
+ queue = server.locateQueue(queueAddr);
+
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging()
&& queuePagingStore.isPageFull());
Review Comment:
assertNotNull
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ // and messages still below limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(bigLimitMessages,
queuePagingStore.getPageLimitMessages());
+
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider)
< queuePagingStore.getPageLimitMessages());
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws
Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ // the numbers should make sure page files reach the pageLimitBytes
before pageLimitMessages
+ final int size = 1024 * 50;
+ final Long maxMessages = 200L;
+
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 5));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the pages reach the limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+
+ assertTrue(existingPages > maxPages);
+
+ // but messages still under limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+ Long existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be less than max " + maxMessages);
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but messages still under limit
+ cursorProvider = queuePagingStore.getCursorProvider();
+ existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be still less than max " + maxMessages);
+
+ // now increase the pageLimitBytes to unblock procuder
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 20));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertTrue(queuePagingStore != null &&
!queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
Review Comment:
No need for null check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact