This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e6df92f57fd KAFKA-17999: Fix flaky DynamicConnectionQuotaTest 
testDynamicConnectionQuota (#21354)
e6df92f57fd is described below

commit e6df92f57fdf0605f598fd6ff3a0fdbc6b06d9fc
Author: ChickenchickenLove <[email protected]>
AuthorDate: Tue May 5 23:47:13 2026 +0900

    KAFKA-17999: Fix flaky DynamicConnectionQuotaTest 
testDynamicConnectionQuota (#21354)
    
    Fix flakiness in `DynamicConnectionQuotaTest#testDynamicConnectionQuota` by 
waiting until the broker’s `max.connections.per.ip.overrides` is actually 
applied in `ConnectionQuotas` before running the verification.
    
    Reviewers: Mickael Maison <[email protected]>
---
 core/src/main/scala/kafka/network/SocketServer.scala      | 10 ++++++++++
 .../kafka/network/DynamicConnectionQuotaTest.scala        | 15 +++++++++++++--
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index bc91329d4bf..7aa170fb178 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1390,6 +1390,16 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
     connectionRatePerIp.getOrDefault(ip, defaultConnectionRatePerIp)
   }
 
+  // Visible for testing
+  private[network] def maxConnectionsPerIpForIp(ip: InetAddress): Int = {
+    maxConnectionsPerIpOverrides.getOrElse(ip, defaultMaxConnectionsPerIp)
+  }
+  
+  // Visible for testing
+  private[network] def maxConnectionsPerIpOverrideForIp(ip: InetAddress): 
Option[Int] = {
+    maxConnectionsPerIpOverrides.get(ip)
+  }
+
   private[network] def addListener(config: KafkaConfig, listenerName: 
ListenerName): Unit = {
     counts.synchronized {
       if (!maxConnectionsPerListener.contains(listenerName)) {
diff --git 
a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
 
b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
index e19bc9a0238..26c09a81880 100644
--- 
a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
+++ 
b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
@@ -29,7 +29,6 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity}
 import org.apache.kafka.common.record.internal.{MemoryRecords, SimpleRecord}
 import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.test.api.Flaky
 import org.apache.kafka.common.{KafkaException, Uuid, requests}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.server.config.QuotaConfig
@@ -81,10 +80,11 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     }
   }
 
-  @Flaky("KAFKA-17999")
   @Test
   def testDynamicConnectionQuota(): Unit = {
     val maxConnectionsPerIP = 5
+    val localhostAddr = InetAddress.getByName("localhost")
+    val quotas = brokers.head.socketServer.connectionQuotas
 
     def connectAndVerify(): Unit = {
       val socket = connect()
@@ -99,12 +99,23 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     props.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG, 
maxConnectionsPerIP.toString)
     reconfigureServers(props, perBrokerConfig = false, 
(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG, 
maxConnectionsPerIP.toString))
 
+    TestUtils.waitUntilTrue(
+      () => quotas.maxConnectionsPerIpForIp(localhostAddr) == 
maxConnectionsPerIP,
+      s"maxConnectionsPerIp not applied yet for ip=localhost 
(expected=$maxConnectionsPerIP, 
current=${quotas.maxConnectionsPerIpForIp(localhostAddr)})"
+    )
+    
     verifyMaxConnections(maxConnectionsPerIP, connectAndVerify)
 
     // Increase MaxConnectionsPerIpOverrides for localhost to 7
     val maxConnectionsPerIPOverride = 7
     props.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, 
s"localhost:$maxConnectionsPerIPOverride")
     reconfigureServers(props, perBrokerConfig = false, 
(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, 
s"localhost:$maxConnectionsPerIPOverride"))
+    
+    TestUtils.waitUntilTrue(
+      () => 
quotas.maxConnectionsPerIpOverrideForIp(localhostAddr).contains(maxConnectionsPerIPOverride),
+      s"maxConnectionsPerIpOverrides not applied yet for ip=localhost 
(expected=$maxConnectionsPerIPOverride, " +
+        s"current=${quotas.maxConnectionsPerIpOverrideForIp(localhostAddr)})"
+    )
 
     verifyMaxConnections(maxConnectionsPerIPOverride, connectAndVerify)
   }

Reply via email to