This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e6df92f57fd KAFKA-17999: Fix flaky DynamicConnectionQuotaTest
testDynamicConnectionQuota (#21354)
e6df92f57fd is described below
commit e6df92f57fdf0605f598fd6ff3a0fdbc6b06d9fc
Author: ChickenchickenLove <[email protected]>
AuthorDate: Tue May 5 23:47:13 2026 +0900
KAFKA-17999: Fix flaky DynamicConnectionQuotaTest
testDynamicConnectionQuota (#21354)
Fix flakiness in `DynamicConnectionQuotaTest#testDynamicConnectionQuota` by
waiting until the broker’s `max.connections.per.ip.overrides` is actually
applied in `ConnectionQuotas` before running the verification.
Reviewers: Mickael Maison <[email protected]>
---
core/src/main/scala/kafka/network/SocketServer.scala | 10 ++++++++++
.../kafka/network/DynamicConnectionQuotaTest.scala | 15 +++++++++++++--
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index bc91329d4bf..7aa170fb178 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1390,6 +1390,16 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
connectionRatePerIp.getOrDefault(ip, defaultConnectionRatePerIp)
}
+ // Visible for testing
+ private[network] def maxConnectionsPerIpForIp(ip: InetAddress): Int = {
+ maxConnectionsPerIpOverrides.getOrElse(ip, defaultMaxConnectionsPerIp)
+ }
+
+ // Visible for testing
+ private[network] def maxConnectionsPerIpOverrideForIp(ip: InetAddress):
Option[Int] = {
+ maxConnectionsPerIpOverrides.get(ip)
+ }
+
private[network] def addListener(config: KafkaConfig, listenerName:
ListenerName): Unit = {
counts.synchronized {
if (!maxConnectionsPerListener.contains(listenerName)) {
diff --git
a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index e19bc9a0238..26c09a81880 100644
---
a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++
b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -29,7 +29,6 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity}
import org.apache.kafka.common.record.internal.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.test.api.Flaky
import org.apache.kafka.common.{KafkaException, Uuid, requests}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.QuotaConfig
@@ -81,10 +80,11 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
}
}
- @Flaky("KAFKA-17999")
@Test
def testDynamicConnectionQuota(): Unit = {
val maxConnectionsPerIP = 5
+ val localhostAddr = InetAddress.getByName("localhost")
+ val quotas = brokers.head.socketServer.connectionQuotas
def connectAndVerify(): Unit = {
val socket = connect()
@@ -99,12 +99,23 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
props.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG,
maxConnectionsPerIP.toString)
reconfigureServers(props, perBrokerConfig = false,
(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG,
maxConnectionsPerIP.toString))
+ TestUtils.waitUntilTrue(
+ () => quotas.maxConnectionsPerIpForIp(localhostAddr) ==
maxConnectionsPerIP,
+ s"maxConnectionsPerIp not applied yet for ip=localhost
(expected=$maxConnectionsPerIP,
current=${quotas.maxConnectionsPerIpForIp(localhostAddr)})"
+ )
+
verifyMaxConnections(maxConnectionsPerIP, connectAndVerify)
// Increase MaxConnectionsPerIpOverrides for localhost to 7
val maxConnectionsPerIPOverride = 7
props.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
s"localhost:$maxConnectionsPerIPOverride")
reconfigureServers(props, perBrokerConfig = false,
(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
s"localhost:$maxConnectionsPerIPOverride"))
+
+ TestUtils.waitUntilTrue(
+ () =>
quotas.maxConnectionsPerIpOverrideForIp(localhostAddr).contains(maxConnectionsPerIPOverride),
+ s"maxConnectionsPerIpOverrides not applied yet for ip=localhost
(expected=$maxConnectionsPerIPOverride, " +
+ s"current=${quotas.maxConnectionsPerIpOverrideForIp(localhostAddr)})"
+ )
verifyMaxConnections(maxConnectionsPerIPOverride, connectAndVerify)
}