ahuang98 commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2101129753
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -428,18 +426,21 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
try {
val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
- if (userEntity.nonEmpty) {
- if (quotaEntity.clientIdEntity.nonEmpty)
- quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
- else
- quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
- } else if (clientEntity.nonEmpty)
- quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
+ val quotaTypes = (userEntity.nonEmpty, clientEntity.nonEmpty) match {
+ case (true, true) => QuotaTypes.UserClientIdQuotaEnabled
+ case (true, false) => QuotaTypes.UserQuotaEnabled
+ case (false, true) => QuotaTypes.ClientIdQuotaEnabled
+ case (false, false) => QuotaTypes.NoQuotas
+ }
quota match {
- case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType,
quotaEntity, newQuota.bound)
- case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+ case Some(newQuota) =>
+ quotaCallback.updateQuota(clientQuotaType, quotaEntity,
newQuota.bound)
+ updateQuotaTypes(quotaTypes, increment = true)
+ case None =>
+ quotaCallback.removeQuota(clientQuotaType, quotaEntity) // change
here.
Review Comment:
don't forget to remove the code comment
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -428,18 +426,21 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
try {
val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
- if (userEntity.nonEmpty) {
- if (quotaEntity.clientIdEntity.nonEmpty)
- quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
- else
- quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
- } else if (clientEntity.nonEmpty)
- quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
+ val quotaTypes = (userEntity.nonEmpty, clientEntity.nonEmpty) match {
+ case (true, true) => QuotaTypes.UserClientIdQuotaEnabled
+ case (true, false) => QuotaTypes.UserQuotaEnabled
+ case (false, true) => QuotaTypes.ClientIdQuotaEnabled
+ case (false, false) => QuotaTypes.NoQuotas
Review Comment:
just want to clarify my understanding of NoQuotas - if this method were
called with a non-empty `quota` but empty `userEntity` and empty
`clientEntity`, would that essentially be a no-op? assuming that combination is
not expected/possible
##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -501,6 +501,76 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
}
}
+ @Test
+ def testQuotaTypesEnabledUpdatesOnAddAndRemove(): Unit = {
+ val clientQuotaManager = new ClientQuotaManager(config, metrics,
QuotaType.CONTROLLER_MUTATION, time, "")
+ try {
+ // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and
quotasEnabled should be false
+ assertEquals(QuotaTypes.NoQuotas,
clientQuotaManager.getQuotaTypesEnabled)
+ assertFalse(clientQuotaManager.quotasEnabled)
+
+ // Add a client-id quota and quotaTypesEnabled should be
QuotaTypes.ClientIdQuotaEnabled
+ clientQuotaManager.updateQuota(
+ None,
+ Some(ClientQuotaManager.ClientIdEntity("client1")),
+ Some(new Quota(5, true))
+ )
+ assertEquals(QuotaTypes.ClientIdQuotaEnabled,
clientQuotaManager.getQuotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
Review Comment:
could we add another case here where we add another `ClientIdQuota` and show
that `getQuotaTypesEnabled` still remains `QuotaTypes.ClientIdQuotaEnabled`?
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +452,43 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Helper method to update quota types counts and quotaTypesEnabled flag.
+ * @param quotaTypeKey The QuotaTypes constant (e.g.,
QuotaTypes.UserClientIdQuotaEnabled)
+ * @param increment True to increment count, false to decrement
+ */
+ private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = {
+ if (quotaTypeKey == QuotaTypes.NoQuotas) {
+ return
+ }
+ val previousQuotaTypesEnabled = quotaTypesEnabled
+
+ // Update activeQuotaTypes counts
+ activeQuotaTypes.compute(quotaTypeKey, (_, count) =>
+ if (increment) Option(count).getOrElse(0) + 1
+ else if (Option(count).exists(_ > 1)) count - 1
+ else 0
+ )
+
+ // Update quotaTypesEnabled based on activeQuotaTypes counts
+ quotaTypesEnabled = clientQuotaCallbackPlugin match {
+ case Some(_) => QuotaTypes.CustomQuotas
+ case None =>
+ var newQuotaTypes = QuotaTypes.NoQuotas
+ if (activeQuotaTypes.getOrDefault(QuotaTypes.UserClientIdQuotaEnabled,
0) > 0)
+ newQuotaTypes |= QuotaTypes.UserClientIdQuotaEnabled
+ if (activeQuotaTypes.getOrDefault(QuotaTypes.UserQuotaEnabled, 0) > 0)
+ newQuotaTypes |= QuotaTypes.UserQuotaEnabled
+ if (activeQuotaTypes.getOrDefault(QuotaTypes.ClientIdQuotaEnabled, 0)
> 0)
+ newQuotaTypes |= QuotaTypes.ClientIdQuotaEnabled
+ newQuotaTypes
+ }
+
+ if (previousQuotaTypesEnabled != quotaTypesEnabled) {
+ debug(s"Quota types enabled changed from $previousQuotaTypesEnabled to
$quotaTypesEnabled")
Review Comment:
let's say we have positive counts for QuotaTypes.UserClientIdQuotaEnabled
and QuotaTypes.UserQuotaEnabled, when we print the `quotaTypesEnabled` we would
expect to see `6` printed?
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +452,43 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Helper method to update quota types counts and quotaTypesEnabled flag.
+ * @param quotaTypeKey The QuotaTypes constant (e.g.,
QuotaTypes.UserClientIdQuotaEnabled)
+ * @param increment True to increment count, false to decrement
+ */
+ private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = {
+ if (quotaTypeKey == QuotaTypes.NoQuotas) {
+ return
+ }
+ val previousQuotaTypesEnabled = quotaTypesEnabled
+
+ // Update activeQuotaTypes counts
+ activeQuotaTypes.compute(quotaTypeKey, (_, count) =>
+ if (increment) Option(count).getOrElse(0) + 1
+ else if (Option(count).exists(_ > 1)) count - 1
+ else 0
+ )
+
+ // Update quotaTypesEnabled based on activeQuotaTypes counts
+ quotaTypesEnabled = clientQuotaCallbackPlugin match {
+ case Some(_) => QuotaTypes.CustomQuotas
+ case None =>
+ var newQuotaTypes = QuotaTypes.NoQuotas
+ if (activeQuotaTypes.getOrDefault(QuotaTypes.UserClientIdQuotaEnabled,
0) > 0)
+ newQuotaTypes |= QuotaTypes.UserClientIdQuotaEnabled
Review Comment:
should we just make this `newQuotaTypes =
QuotaTypes.UserClientIdQuotaEnabled`?
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +452,43 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Helper method to update quota types counts and quotaTypesEnabled flag.
+ * @param quotaTypeKey The QuotaTypes constant (e.g.,
QuotaTypes.UserClientIdQuotaEnabled)
+ * @param increment True to increment count, false to decrement
+ */
+ private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = {
+ if (quotaTypeKey == QuotaTypes.NoQuotas) {
+ return
+ }
+ val previousQuotaTypesEnabled = quotaTypesEnabled
+
+ // Update activeQuotaTypes counts
+ activeQuotaTypes.compute(quotaTypeKey, (_, count) =>
+ if (increment) Option(count).getOrElse(0) + 1
+ else if (Option(count).exists(_ > 1)) count - 1
+ else 0
+ )
+
+ // Update quotaTypesEnabled based on activeQuotaTypes counts
Review Comment:
maybe would be helpful if this comment explained the bitwise operation,
potentially with an example.
e.g. quotaTypesEnabled is the bitwise representation of what quota types are
enabled. For instance, if quotaTypesEnabled == 6, then we know we have
UserClientIdQuotaEnabled and UserQuotaEnabled since their bit representations
are 4 and 2. (4 | 2 = 6)
##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -501,6 +501,76 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
}
}
+ @Test
+ def testQuotaTypesEnabledUpdatesOnAddAndRemove(): Unit = {
+ val clientQuotaManager = new ClientQuotaManager(config, metrics,
QuotaType.CONTROLLER_MUTATION, time, "")
+ try {
+ // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and
quotasEnabled should be false
+ assertEquals(QuotaTypes.NoQuotas,
clientQuotaManager.getQuotaTypesEnabled)
+ assertFalse(clientQuotaManager.quotasEnabled)
+
+ // Add a client-id quota and quotaTypesEnabled should be
QuotaTypes.ClientIdQuotaEnabled
+ clientQuotaManager.updateQuota(
+ None,
+ Some(ClientQuotaManager.ClientIdEntity("client1")),
+ Some(new Quota(5, true))
+ )
+ assertEquals(QuotaTypes.ClientIdQuotaEnabled,
clientQuotaManager.getQuotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Add a user quota and quotaTypesEnabled should be
QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
+ clientQuotaManager.updateQuota(
+ Some(ClientQuotaManager.UserEntity("userA")),
+ None,
+ Some(new Quota(5, true))
+ )
+ assertEquals(QuotaTypes.UserQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.getQuotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Add a user-client-id quota and quotaTypesEnabled should
+ // be QuotaTypes.UserClientIdQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled
+ clientQuotaManager.updateQuota(
+ Some(ClientQuotaManager.UserEntity("userB")),
+ Some(ClientQuotaManager.ClientIdEntity("client2")),
+ Some(new Quota(10, true))
+ )
+ assertEquals(
+ QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
| QuotaTypes.UserQuotaEnabled,
+ clientQuotaManager.getQuotaTypesEnabled
+ )
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Remove the user quota and quotaTypesEnabled should be
QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
Review Comment:
with the above suggestions, the remove cases become more interesting too -
after removing just one `ClientIdQuota` we would expect that the
quotaTypesEnabled does not change yet
##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -501,6 +501,76 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
}
}
+ @Test
+ def testQuotaTypesEnabledUpdatesOnAddAndRemove(): Unit = {
+ val clientQuotaManager = new ClientQuotaManager(config, metrics,
QuotaType.CONTROLLER_MUTATION, time, "")
+ try {
+ // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and
quotasEnabled should be false
+ assertEquals(QuotaTypes.NoQuotas,
clientQuotaManager.getQuotaTypesEnabled)
+ assertFalse(clientQuotaManager.quotasEnabled)
+
+ // Add a client-id quota and quotaTypesEnabled should be
QuotaTypes.ClientIdQuotaEnabled
+ clientQuotaManager.updateQuota(
+ None,
+ Some(ClientQuotaManager.ClientIdEntity("client1")),
+ Some(new Quota(5, true))
+ )
+ assertEquals(QuotaTypes.ClientIdQuotaEnabled,
clientQuotaManager.getQuotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Add a user quota and quotaTypesEnabled should be
QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
+ clientQuotaManager.updateQuota(
+ Some(ClientQuotaManager.UserEntity("userA")),
+ None,
+ Some(new Quota(5, true))
+ )
+ assertEquals(QuotaTypes.UserQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.getQuotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Add a user-client-id quota and quotaTypesEnabled should
+ // be QuotaTypes.UserClientIdQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled
+ clientQuotaManager.updateQuota(
+ Some(ClientQuotaManager.UserEntity("userB")),
+ Some(ClientQuotaManager.ClientIdEntity("client2")),
+ Some(new Quota(10, true))
+ )
+ assertEquals(
+ QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
| QuotaTypes.UserQuotaEnabled,
+ clientQuotaManager.getQuotaTypesEnabled
+ )
+ assertTrue(clientQuotaManager.quotasEnabled)
+
Review Comment:
could we add another case here where we add a duplicate quota type (e.g.
UserClientIdQuota) and show that quotasEnabled still remains the same value?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]