This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a3ae70545f3 [fix][test] Fix flaky
MessagePublishBufferThrottleTest.testBlockByPublishRateLimiting (#25365)
a3ae70545f3 is described below
commit a3ae70545f3aef40e8f5cfcced907e6500b08894
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 18:08:38 2026 -0700
[fix][test] Fix flaky
MessagePublishBufferThrottleTest.testBlockByPublishRateLimiting (#25365)
---
.../service/MessagePublishBufferThrottleTest.java | 60 ++++++++++++++--------
1 file changed, 39 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 0faae14da08..6a74df3eecc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -20,10 +20,9 @@ package org.apache.pulsar.broker.service;
import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
-import static org.testng.Assert.fail;
-import java.util.concurrent.CompletableFuture;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -134,33 +133,32 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
pulsarTestContext.getMockBookKeeper().addEntryDelay(5,
TimeUnit.SECONDS);
- // Block by publish buffer.
+ // Block by publish buffer: 10 x 1MB messages with a 1MB buffer limit.
byte[] payload = new byte[1024 * 1024];
for (int i = 0; i < 10; i++) {
producer.sendAsync(payload);
}
- Awaitility.await().untilAsserted(() ->
assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1));
+ // Wait for at least one pause event to be recorded.
+ Awaitility.await().untilAsserted(
+ () ->
assertRateLimitCounterAtLeast(ConnectionRateLimitOperationName.PAUSED, 1));
- CompletableFuture<Void> flushFuture = producer.flushAsync();
+ // Verify that no resume has happened yet while messages are still
blocked.
+ Awaitility.await().untilAsserted(
+ () ->
assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0));
- // Block by publish rate.
- // After 1 second, the message buffer throttling will be lifted, but
the rate limiting will still be in place.
- assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1);
- assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
-
- try {
- flushFuture.get(2, TimeUnit.SECONDS);
- fail("Should have timed out");
- } catch (TimeoutException e) {
- // Ok
- }
-
- flushFuture.join();
+ // Flush and wait for all messages to complete.
+ producer.flush();
+ // After all messages are sent, the number of pauses and resumes
should match:
+ // every pause must eventually be followed by a resume.
Awaitility.await().untilAsserted(() -> {
- assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED,
10);
- assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED,
10);
+ var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ long pausedCount = getMetricLongSumValue(metrics,
ConnectionRateLimitOperationName.PAUSED);
+ long resumedCount = getMetricLongSumValue(metrics,
ConnectionRateLimitOperationName.RESUMED);
+ Assert.assertTrue(pausedCount > 0, "Expected at least one pause
event");
+ Assert.assertEquals(pausedCount, resumedCount,
+ "Paused and resumed counts should match after all messages
are sent");
});
}
@@ -206,4 +204,24 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
connectionRateLimitState.attributes, expectedCount);
}
}
+
+ private void
assertRateLimitCounterAtLeast(ConnectionRateLimitOperationName
connectionRateLimitState,
+ int minExpectedCount) {
+ var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME,
+ connectionRateLimitState.attributes,
+ actual ->
assertThat(actual).isGreaterThanOrEqualTo(minExpectedCount));
+ }
+
+ private long getMetricLongSumValue(Collection<MetricData> metrics,
+ ConnectionRateLimitOperationName
connectionRateLimitState) {
+ var attributesMap = connectionRateLimitState.attributes.asMap();
+ return metrics.stream()
+ .filter(m ->
m.getName().equals(BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME))
+ .flatMap(m -> m.getLongSumData().getPoints().stream())
+ .filter(point ->
point.getAttributes().asMap().equals(attributesMap))
+
.mapToLong(io.opentelemetry.sdk.metrics.data.LongPointData::getValue)
+ .findFirst()
+ .orElse(0L);
+ }
}