gemmellr commented on code in PR #5128:
URL: https://github.com/apache/activemq-artemis/pull/5128#discussion_r1718225568
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java:
##########
@@ -288,6 +296,408 @@ public void
testPagingDoesNotDuplicateBatchMessagesAfterPagingStarted() throws E
}
}
+ @Test
+ public void testPageLimitBytesValidation() throws Exception {
+ final String addressName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(false, false);
+
+ SimpleString queueAddr = SimpleString.of(addressName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ int size = 1048576;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) size);
+ addressSettings.setMaxSizeBytes(size);
+
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ int totalMessages = 15;
+ int messageSize = 90000;
+ sendMessageBatch(totalMessages, messageSize, session, queueAddr);
+
+ Queue queue = server.locateQueue(queueAddr);
+
+ // Give time Queue.deliverAsync to deliver messages
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPaging());
+
+ assertFalse(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to be smaller than pageSizeBytes
+ addressSettings.setPageLimitBytes((long) (size - 1));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // check the settings applied
+ assertEquals(size - 1, queuePagingStore.getPageLimitBytes());
+
+ // check pageFull is true
+ assertTrue(queuePagingStore.isPageFull());
+
+ // send a messages should be immediately blocked (in our case FAIL)
+ try {
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ fail("should be immediate blocked on paging");
+ } catch (ActiveMQAddressFullException ex) {
+ //ok
+ }
+
+ assertTrue(queuePagingStore.isPageFull());
+
+ // set pageLimitBytes to bigger value to unblock paging again
+ addressSettings.setPageLimitBytes((long) (size * 2));
+ server.getAddressSettingsRepository().addMatch(addressName,
addressSettings);
+
+ // now page is enabled again
+ assertFalse(queuePagingStore.isPageFull());
+
+ sendMessageBatch(1, messageSize, session, queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages + 1, 10000));
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitMessagesFirst()
throws Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ final int size = 1024 * 50;
+ final Long maxMessages = 10L;
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes((long) (size * 10));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the messages reach the limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+
assertEquals(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider),
maxMessages);
+
+ // but pages still under limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ assertTrue(existingPages <= queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes());
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but current pages still under limit
+ long currentPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages <= maxPages);
+
+ // now increase the max messages to unblock procuder
+ final Long bigLimitMessages = 1000L;
+ addressSettings.setPageLimitMessages(bigLimitMessages);
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertFalse(queuePagingStore.isPageFull());
+
+ // now send more messages until pagefull
+ locator = createFactory(isNetty());
+ try (ClientSessionFactory csf = createSessionFactory(locator)) {
+ stop = false;
+ session = csf.createSession(true, true);
+ producer = session.createProducer(queueAddr);
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session,
messageSize, totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ assertTrue(totalMessages <= bigLimitMessages, "test is
broken");
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+ }
+
+ // check the page full
+ assertTrue(queuePagingStore.isPageFull());
+ // because it reaches pageLimitBytes
+ currentPages = queuePagingStore.getNumberOfPages();
+ maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+ assertTrue(currentPages > maxPages);
+
+ // and messages still below limit messages
+ cursorProvider = queuePagingStore.getCursorProvider();
+ assertEquals(bigLimitMessages,
queuePagingStore.getPageLimitMessages());
+
assertTrue(PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider)
< queuePagingStore.getPageLimitMessages());
+ } finally {
+ server.stop(true);
+ }
+ }
+ }
+
+ @Test
+ public void
testPageLimitBytesAndPageLimitMessagesValidationBlockOnLimitBytesFirst() throws
Exception {
+
+ final String queueName = getTestMethodName();
+
+ try (ClientSessionFactory sf = createSessionFactory(locator)) {
+ ClientSession session = sf.createSession(true, true);
+
+ SimpleString queueAddr = SimpleString.of(queueName);
+ session.createQueue(QueueConfiguration.of(queueAddr));
+
+ // the numbers should make sure page files reach the pageLimitBytes
before pageLimitMessages
+ final int size = 1024 * 50;
+ final Long maxMessages = 200L;
+
+ AddressSettings addressSettings = new AddressSettings();
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setPageFullMessagePolicy(PageFullMessagePolicy.FAIL);
+ addressSettings.setPageSizeBytes(size);
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 5));
+ addressSettings.setMaxSizeBytes(size);
+ addressSettings.setPageLimitMessages(maxMessages);
+
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ int totalMessages = 0;
+ int messageSize = 1024;
+ ClientProducer producer = session.createProducer(queueAddr);
+ boolean stop = false;
+ while (!stop) {
+ try {
+ ClientMessage message = createMessage(session, messageSize,
totalMessages, null);
+ producer.send(message);
+ totalMessages++;
+ session.commit();
+ } catch (ActiveMQAddressFullException ex) {
+ stop = true;
+ }
+ }
+
+ Queue queue = server.locateQueue(queueAddr);
+ assertTrue(waitForMessages(queue, totalMessages, 10000));
+
+ PagingStore queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null && queuePagingStore.isPageFull());
+
+ // the pages reach the limit
+ long existingPages = queuePagingStore.getNumberOfPages();
+ long maxPages = queuePagingStore.getPageLimitBytes() /
queuePagingStore.getPageSizeBytes();
+
+ assertTrue(existingPages > maxPages);
+
+ // but messages still under limit
+ PageCursorProvider cursorProvider =
queuePagingStore.getCursorProvider();
+ Long existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be less than max " + maxMessages);
+
+ server.stop(true);
+ waitForServerToStop(server);
+
+ // restart the server the pageFull is still true
+ try {
+ server.start();
+ waitForServerToStart(server);
+
+ queue = server.locateQueue(queueAddr);
+
+ queuePagingStore = queue.getPagingStore();
+ assertTrue(queuePagingStore != null &&
queuePagingStore.isPageFull());
+
+ // but messages still under limit
+ cursorProvider = queuePagingStore.getCursorProvider();
+ existingMessages =
PageCursorProviderTestAccessor.getNumberOfMessagesOnSubscriptions(cursorProvider);
+ assertTrue(existingMessages < maxMessages, "existing " +
existingMessages + " should be still less than max " + maxMessages);
+
+ // now increase the pageLimitBytes to unblock procuder
+ addressSettings.setPageLimitBytes(Long.valueOf(size * 20));
+ server.getAddressSettingsRepository().addMatch(queueName,
addressSettings);
+
+ // no longer page full
+ assertTrue(queuePagingStore != null &&
!queuePagingStore.isPageFull());
Review Comment:
queuePagingStore != null is superfluous here, since it already checked it
wasnt null, and havent changed the variable since (but have used it again
without checking it isnt null)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact