This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new df3f6caefe [AMQ-9858] Support purging the first number of messages
from a queue
df3f6caefe is described below
commit df3f6caefe0763fb46ceac4117b6187bacd98eec
Author: Matt Pavlovich <[email protected]>
AuthorDate: Tue Feb 10 18:04:01 2026 -0600
[AMQ-9858] Support purging the first number of messages from a queue
---
.../org/apache/activemq/broker/jmx/QueueView.java | 8 ++++++
.../apache/activemq/broker/jmx/QueueViewMBean.java | 8 ++++++
.../org/apache/activemq/broker/region/Queue.java | 31 ++++++++++++++++----
.../org/apache/activemq/broker/jmx/PurgeTest.java | 33 ++++++++++++++++++++++
4 files changed, 74 insertions(+), 6 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
index 067e685683..03a32927e3 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
@@ -64,6 +64,14 @@ public class QueueView extends DestinationView implements
QueueViewMBean {
LOG.info("{} purge of {} messages",
destination.getActiveMQDestination().getQualifiedName(), originalMessageCount);
}
+ public synchronized void purge(long numberOfMessages) throws Exception {
+ final long originalMessageCount =
destination.getDestinationStatistics().getMessages().getCount();
+
+ ((Queue)destination).purge(numberOfMessages);
+
+ LOG.info("{} purge {} of {} messages",
destination.getActiveMQDestination().getQualifiedName(), numberOfMessages,
originalMessageCount);
+ }
+
public synchronized boolean removeMessage(String messageId) throws
Exception {
return ((Queue)destination).removeMessage(messageId);
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
index 27ef61c9f0..67175d4637 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
@@ -70,6 +70,14 @@ public interface QueueViewMBean extends DestinationViewMBean
{
@MBeanInfo("Removes all of the messages in the queue.")
void purge() throws Exception;
+ /**
+ * Removes the first number of messages in the queue.
+ *
+ * @throws Exception
+ */
+ @MBeanInfo("Removes the first number of messages in the queue.")
+ void purge(long numberOfMessages) throws Exception;
+
/**
* Copies a given message to another destination.
*
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 048512cafb..cd534c8a35 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1320,9 +1320,20 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
}
public void purge() throws Exception {
+ purge(this.destinationStatistics.getMessages().getCount());
+ }
+
+ public void purge(long numberOfMessages) throws Exception {
+
+ if (numberOfMessages <= 0) {
+ return;
+ }
+
ConnectionContext c = createConnectionContext();
List<MessageReference> list = null;
sendLock.lock();
+
+ long purgeCount = 0L;
try {
long originalMessageCount =
this.destinationStatistics.getMessages().getCount();
do {
@@ -1330,24 +1341,32 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
pagedInMessagesLock.readLock().lock();
try {
list = new
ArrayList<MessageReference>(pagedInMessages.values());
- }finally {
+ } finally {
pagedInMessagesLock.readLock().unlock();
}
- for (MessageReference ref : list) {
+ int deleteCount = list.size();
+ if ((numberOfMessages - purgeCount) < list.size()) {
+ deleteCount = (int)(numberOfMessages - purgeCount);
+ }
+
+ for (int n=0; n < deleteCount; n++) {
try {
- QueueMessageReference r = (QueueMessageReference) ref;
+ QueueMessageReference r = (QueueMessageReference)
list.get(n);
removeMessage(c, r);
messages.rollback(r.getMessageId());
+ purgeCount++;
} catch (IOException e) {
}
}
// don't spin/hang if stats are out and there is nothing left
in the
// store
- } while (!list.isEmpty() &&
this.destinationStatistics.getMessages().getCount() > 0);
+ } while (!list.isEmpty() &&
+ this.destinationStatistics.getMessages().getCount() > 0 &&
+ purgeCount < numberOfMessages);
- if (this.destinationStatistics.getMessages().getCount() > 0) {
- LOG.warn("{} after purge of {} messages, message count stats
report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount,
this.destinationStatistics.getMessages().getCount());
+ if (numberOfMessages == originalMessageCount &&
this.destinationStatistics.getMessages().getCount() > 0) {
+ LOG.warn("{} after purge {} of {} messages, message count
stats report: {}", getActiveMQDestination().getQualifiedName(),
numberOfMessages, originalMessageCount,
this.destinationStatistics.getMessages().getCount());
}
} finally {
sendLock.unlock();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
index 780a7f5aa0..7aef8b2be4 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
@@ -112,6 +112,39 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
producer.close();
}
+ public void testPurgeCount() throws Exception {
+ // Send some messages
+ int messagesSent = 1_000;
+ int messagesPurge = 200;
+
+ connection = connectionFactory.createConnection();
+ connection.setClientID(clientID);
+ connection.start();
+ Session session = connection.createSession(transacted, authMode);
+ destination = createDestination();
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < messagesSent; i++) {
+ Message message = session.createTextMessage("Message: " + i);
+ producer.send(message);
+ }
+
+ // Now get the QueueViewMBean and purge
+ String objectNameStr = broker.getBrokerObjectName().toString();
+ objectNameStr +=
",destinationType=Queue,destinationName="+getDestinationString();
+ ObjectName queueViewMBeanName =
assertRegisteredObjectName(objectNameStr);
+ QueueViewMBean proxy =
(QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+
+ long count = proxy.getQueueSize();
+ assertEquals("Queue size", count, messagesSent);
+
+ for (int i = 1; i <= 5; i++) {
+ proxy.purge(messagesPurge);
+ count = proxy.getQueueSize();
+ assertEquals("Queue size", count, messagesSent - (messagesPurge *
i));
+ }
+ producer.close();
+ }
+
public void initCombosForTestDelete() {
addCombinationValues("persistenceAdapter", new Object[] {new
MemoryPersistenceAdapter(), new KahaDBPersistenceAdapter()});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact