This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 69fe3a147f ARTEMIS-6031 Handle credit starvation affecting Core bridge
69fe3a147f is described below
commit 69fe3a147f72cbc57685fa554a97569ec85ce1b2
Author: AntonRoskvist <[email protected]>
AuthorDate: Wed Apr 29 09:01:26 2026 +0200
ARTEMIS-6031 Handle credit starvation affecting Core bridge
---
.../client/impl/AbstractProducerCreditsImpl.java | 4 +-
.../impl/AsynchronousProducerCreditsImplTest.java | 48 +++++++++++
.../integration/cluster/bridge/BridgeTest.java | 94 ++++++++++++++++++++++
3 files changed, 144 insertions(+), 2 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
index ac83c29b1a..8e9381eead 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -162,7 +162,7 @@ public abstract class AbstractProducerCreditsImpl
implements ClientProducerCredi
int toRequest = -1;
synchronized (this) {
- if (getBalance() + arriving < needed) {
+ if (getBalance() + arriving <= needed) {
toRequest = needed - arriving;
if (logger.isTraceEnabled()) {
@@ -170,7 +170,7 @@ public abstract class AbstractProducerCreditsImpl
implements ClientProducerCredi
}
} else {
if (logger.isTraceEnabled()) {
- logger.trace("CheckCredits did not need it, balance={},
arriving={}, needed={}, getbalance + arriving < needed={}", getBalance(),
arriving, needed, (boolean)(getBalance() + arriving < needed));
+ logger.trace("CheckCredits did not need it, balance={},
arriving={}, needed={}, getbalance + arriving <= needed={}", getBalance(),
arriving, needed, (boolean)(getBalance() + arriving <= needed));
}
}
}
diff --git
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImplTest.java
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImplTest.java
index 7783894764..8e32131eed 100644
---
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImplTest.java
+++
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImplTest.java
@@ -17,10 +17,16 @@
package org.apache.activemq.artemis.core.client.impl;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
public class AsynchronousProducerCreditsImplTest {
@Test
@@ -32,4 +38,46 @@ public class AsynchronousProducerCreditsImplTest {
Mockito.verify(session).sendProducerCreditsMessage(0, null);
}
+ @Test
+ @Timeout(10)
+ public void testCreditsRequestedWhenMessageSizeExactlyEqualsBalance()
throws Exception {
+ ClientSessionInternal mockClientSession =
Mockito.mock(ClientSessionInternal.class);
+
+ AtomicInteger creditsRequested = new AtomicInteger(0);
+ AtomicBoolean blocked = new AtomicBoolean(false);
+
+ int producerWindowSize = 1000;
+
+ Mockito.doAnswer(inv -> {
+ creditsRequested.addAndGet(inv.getArgument(0));
+ return null;
+ }).when(mockClientSession).sendProducerCreditsMessage(Mockito.anyInt(),
Mockito.any());
+
+ AsynchronousProducerCreditsImpl producerCredits = new
AsynchronousProducerCreditsImpl(mockClientSession, null, producerWindowSize,
new ClientProducerFlowCallback() {
+
+ @Override
+ public void onCreditsFlow(boolean isBlocked, ClientProducerCredits
credits) {
+ blocked.set(isBlocked);
+ }
+
+ @Override
+ public void onCreditsFail(ClientProducerCredits credits) {
+ }
+
+ });
+
+ int internalWindowSize = producerWindowSize / 2;
+ // drain balance to just above internalWindowSize
+ producerCredits.actualAcquire(internalWindowSize - 100);
+
+ int messageSize = producerCredits.getBalance();
+ producerCredits.acquireCredits(messageSize);
+
+ assertTrue(creditsRequested.get() > 0, "credits must be requested when
message size exactly equals balance");
+
+ producerCredits.receiveCredits(creditsRequested.get());
+ assertTrue(producerCredits.getBalance() > 0);
+ assertFalse(blocked.get());
+ }
+
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 65b9264648..e5d3202acc 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -270,6 +270,100 @@ public class BridgeTest extends ActiveMQTestBase {
long timeTaken = System.currentTimeMillis() - time;
}
+ @TestTemplate
+ public void testBridgeHandlesReachingZeroCredits() throws Exception {
+ final int numMessages = 10;
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ final String bridgeName = "bridge0";
+
+ final int flowControlSize = 1024 * 10;
+ final AtomicInteger messageForwardSize = new AtomicInteger(0);
+
+ //This is used to capture the actual message size that the bridge will
send
+ Transformer transformer = message -> {
+ messageForwardSize.addAndGet(message.getEncodeSize());
+ return message;
+ };
+
+ Map<String, Object> server0Params = new HashMap<>();
+ Map<String, Object> server1Params = new HashMap<>();
+
+ addTargetParameters(server1Params);
+
+ server0 = createClusteredServerWithParams(isNetty(), 0, true,
server0Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true,
server1Params);
+
+ server0.getServiceRegistry().addBridgeTransformer(bridgeName,
transformer);
+ TransportConfiguration server0tc = new
TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new
TransportConfiguration(getConnector(), server1Params);
+
+
server0.getConfiguration().setConnectorConfigurations(Map.of(server1tc.getName(),
server1tc));
+
+ server0.start();
+ server1.start();
+
+
server0.createQueue(QueueConfiguration.of(queueName0).setAddress(testAddress));
+
server1.createQueue(QueueConfiguration.of(queueName1).setAddress(forwardAddress));
+
+ locator =
addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc,
server1tc));
+
+ server0.deployBridge(new BridgeConfiguration()
+ .setName(bridgeName)
+ .setQueueName(queueName0)
+ .setForwardingAddress(forwardAddress)
+ .setProducerWindowSize(flowControlSize)
+
.setStaticConnectors(List.of(server1tc.getName())));
+
+ ClientSessionFactory sf0 =
addSessionFactory(locator.createSessionFactory(server0tc));
+ ClientSessionFactory sf1 =
addSessionFactory(locator.createSessionFactory(server1tc));
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+ ClientSession session1 = sf1.createSession(false, true, true);
+ session1.start();
+
+ ClientProducer producer0 =
session0.createProducer(SimpleString.of(testAddress));
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ // send empty message to capture the size of a base message + bridge
property
+ producer0.send(session0.createMessage(true));
+ Wait.assertTrue(() -> messageForwardSize.get() > 0);
+
+ // messageForwardSize is multiplied by two to account for previous and
upcoming message
+ // the intention is to land on exactly 0 credits after the next message
is sent
+ int sendSize = flowControlSize - (messageForwardSize.get() * 2);
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session0.createMessage(true);
+ message.getBodyBuffer().writeBytes(new byte[sendSize]);
+ producer0.send(message);
+ }
+
+ for (int i = 0; i < numMessages + 1; i++) {
+ ClientMessage message = consumer1.receive(1000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
+
+ assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+ session1.close();
+ sf0.close();
+ sf1.close();
+
+ closeFields();
+
+ if (server0.getConfiguration().isPersistenceEnabled()) {
+ assertEquals(0, loadQueues(server0).size());
+ }
+
+ }
+
@TestTemplate
public void testBlockedBridgeAndReconnect() throws Exception {
long time = System.currentTimeMillis();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]