This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new df3f6caefe [AMQ-9858] Support purging the first number of messages 
from a queue
df3f6caefe is described below

commit df3f6caefe0763fb46ceac4117b6187bacd98eec
Author: Matt Pavlovich <[email protected]>
AuthorDate: Tue Feb 10 18:04:01 2026 -0600

    [AMQ-9858] Support purging the first number of messages from a queue
---
 .../org/apache/activemq/broker/jmx/QueueView.java  |  8 ++++++
 .../apache/activemq/broker/jmx/QueueViewMBean.java |  8 ++++++
 .../org/apache/activemq/broker/region/Queue.java   | 31 ++++++++++++++++----
 .../org/apache/activemq/broker/jmx/PurgeTest.java  | 33 ++++++++++++++++++++++
 4 files changed, 74 insertions(+), 6 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
index 067e685683..03a32927e3 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
@@ -64,6 +64,14 @@ public class QueueView extends DestinationView implements 
QueueViewMBean {
         LOG.info("{} purge of {} messages", 
destination.getActiveMQDestination().getQualifiedName(), originalMessageCount);
     }
 
+    public synchronized void purge(long numberOfMessages) throws Exception {
+        final long originalMessageCount = 
destination.getDestinationStatistics().getMessages().getCount();
+
+        ((Queue)destination).purge(numberOfMessages);
+
+        LOG.info("{} purge {} of {} messages", 
destination.getActiveMQDestination().getQualifiedName(), numberOfMessages, 
originalMessageCount);
+    }
+
     public synchronized boolean removeMessage(String messageId) throws 
Exception {
         return ((Queue)destination).removeMessage(messageId);
     }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
index 27ef61c9f0..67175d4637 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
@@ -70,6 +70,14 @@ public interface QueueViewMBean extends DestinationViewMBean 
{
     @MBeanInfo("Removes all of the messages in the queue.")
     void purge() throws Exception;
 
+    /**
+     * Removes the first number of messages in the queue.
+     *
+     * @throws Exception
+     */
+    @MBeanInfo("Removes the first number of messages in the queue.")
+    void purge(long numberOfMessages) throws Exception;
+
     /**
      * Copies a given message to another destination.
      * 
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 048512cafb..cd534c8a35 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1320,9 +1320,20 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
     }
 
     public void purge() throws Exception {
+        purge(this.destinationStatistics.getMessages().getCount());
+    }
+
+    public void purge(long numberOfMessages) throws Exception {
+
+        if (numberOfMessages <= 0) {
+            return;
+        }
+
         ConnectionContext c = createConnectionContext();
         List<MessageReference> list = null;
         sendLock.lock();
+
+        long purgeCount = 0L;
         try {
             long originalMessageCount = 
this.destinationStatistics.getMessages().getCount();
             do {
@@ -1330,24 +1341,32 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
                 pagedInMessagesLock.readLock().lock();
                 try {
                     list = new 
ArrayList<MessageReference>(pagedInMessages.values());
-                }finally {
+                } finally {
                     pagedInMessagesLock.readLock().unlock();
                 }
 
-                for (MessageReference ref : list) {
+                int deleteCount = list.size();
+                if ((numberOfMessages - purgeCount) < list.size()) {
+                    deleteCount = (int)(numberOfMessages - purgeCount);
+                }
+
+                for (int n=0; n < deleteCount; n++) {
                     try {
-                        QueueMessageReference r = (QueueMessageReference) ref;
+                        QueueMessageReference r = (QueueMessageReference) 
list.get(n);
                         removeMessage(c, r);
                         messages.rollback(r.getMessageId());
+                        purgeCount++;
                     } catch (IOException e) {
                     }
                 }
                 // don't spin/hang if stats are out and there is nothing left 
in the
                 // store
-            } while (!list.isEmpty() && 
this.destinationStatistics.getMessages().getCount() > 0);
+            } while (!list.isEmpty() &&
+                    this.destinationStatistics.getMessages().getCount() > 0 &&
+                    purgeCount < numberOfMessages);
 
-            if (this.destinationStatistics.getMessages().getCount() > 0) {
-                LOG.warn("{} after purge of {} messages, message count stats 
report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, 
this.destinationStatistics.getMessages().getCount());
+            if (numberOfMessages == originalMessageCount && 
this.destinationStatistics.getMessages().getCount() > 0) {
+                LOG.warn("{} after purge {} of {} messages, message count 
stats report: {}", getActiveMQDestination().getQualifiedName(), 
numberOfMessages, originalMessageCount, 
this.destinationStatistics.getMessages().getCount());
             }
         } finally {
             sendLock.unlock();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
index 780a7f5aa0..7aef8b2be4 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
@@ -112,6 +112,39 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
         producer.close();
     }
 
+    public void testPurgeCount() throws Exception {
+        // Send some messages
+        int messagesSent = 1_000;
+        int messagesPurge = 200;
+
+        connection = connectionFactory.createConnection();
+        connection.setClientID(clientID);
+        connection.start();
+        Session session = connection.createSession(transacted, authMode);
+        destination = createDestination();
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < messagesSent; i++) {
+            Message message = session.createTextMessage("Message: " + i);
+            producer.send(message);
+        }
+
+        // Now get the QueueViewMBean and purge
+        String objectNameStr = broker.getBrokerObjectName().toString();
+        objectNameStr += 
",destinationType=Queue,destinationName="+getDestinationString();
+        ObjectName queueViewMBeanName = 
assertRegisteredObjectName(objectNameStr);
+        QueueViewMBean proxy = 
(QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, 
queueViewMBeanName, QueueViewMBean.class, true);
+
+        long count = proxy.getQueueSize();
+        assertEquals("Queue size", count, messagesSent);
+
+        for (int i = 1; i <= 5; i++) {
+            proxy.purge(messagesPurge);
+            count = proxy.getQueueSize();
+            assertEquals("Queue size", count, messagesSent - (messagesPurge * 
i));
+        }
+        producer.close();
+    }
+
     public void initCombosForTestDelete() {
         addCombinationValues("persistenceAdapter", new Object[] {new 
MemoryPersistenceAdapter(), new KahaDBPersistenceAdapter()});
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to