rajinisivaram commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2167158972
##########
clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java:
##########
@@ -106,6 +107,14 @@ public interface ClientQuotaCallback extends Configurable {
*/
boolean updateClusterMetadata(Cluster cluster);
+ /**
+ * Return a set of active client quota entities, which represent the
quotas currently in effect and applicable.
+ * If the callback does not track active client quotas, it may return an
empty set.
+ *
+ * @return a set of active client quota entities
+ */
+ Set<ClientQuotaEntity> getActiveQuotasEntities();
Review Comment:
This is public API, we cannot change it without a KIP. I don't think we need
this in the interface anyway.
When custom callbacks are used, the code that relies on quotaTypesEnabled
doesn't care about any of the types except QuotaTypes.CustomQuotas. So as long
as that is set, the code path would be the same, regardless of the entities. So
we only need to track quota entities for default callback, which we can do
without modifying the interface. Hope I haven't missed the intention of
including this in the interface.
##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -501,6 +505,156 @@ class ClientQuotaManagerTest extends
BaseClientQuotaManagerTest {
}
}
+ @Test
+ def testQuotaTypesEnabledUpdatesWithDefaultCallback(): Unit = {
+ val clientQuotaManager = new ClientQuotaManager(config, metrics,
QuotaType.CONTROLLER_MUTATION, time, "")
+ try {
+ // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and
quotasEnabled should be false
+ assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled)
+ assertFalse(clientQuotaManager.quotasEnabled)
+
+ // Add a client-id quota, quotaTypesEnabled should be
QuotaTypes.ClientIdQuotaEnabled
+ clientQuotaManager.updateQuota(None,
Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(5, true)))
+ assertEquals(QuotaTypes.ClientIdQuotaEnabled,
clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Add a user quota, quotaTypesEnabled should be
QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
+
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")),
None, Some(new Quota(5, true)))
+ assertEquals(QuotaTypes.UserQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Add a duplicate client-id quota, quotaTypesEnabled should remain
unchanged
+ clientQuotaManager.updateQuota(None,
Some(ClientQuotaManager.ClientIdEntity("client2")), Some(new Quota(5, true)))
+ assertEquals(QuotaTypes.UserQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Add duplicate user quota, quotaTypesEnabled should remain unchanged
+
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")),
None, Some(new Quota(5, true)))
+ assertEquals(QuotaTypes.UserQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Add a user-client-id quota, quotaTypesEnabled should be
QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled |
QuotaTypes.UserQuotaEnabled
+
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")),
Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(10, true)))
+ assertEquals(QuotaTypes.UserClientIdQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled,
clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Add Duplicate user-client-id quota, quotaTypesEnabled should remain
unchanged
+
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")),
Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(12, true)))
+ assertEquals(QuotaTypes.UserClientIdQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled,
clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Remove the first user quota, quotaTypesEnabled should remain unchanged
+
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")),
None, None)
+ assertEquals(QuotaTypes.UserClientIdQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled,
clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Remove the second user quota, quotaTypesEnabled should be
QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
+
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")),
None, None)
+ assertEquals(QuotaTypes.UserClientIdQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Remove the first client-id quota, quotaTypesEnabled should remain
unchanged
+ clientQuotaManager.updateQuota(None,
Some(ClientQuotaManager.ClientIdEntity("client1")), None)
+ assertEquals(QuotaTypes.UserClientIdQuotaEnabled |
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Remove the second client-id quota, quotaTypesEnabled should be
QuotaTypes.UserClientIdQuotaEnabled
+ clientQuotaManager.updateQuota(None,
Some(ClientQuotaManager.ClientIdEntity("client2")), None)
+ assertEquals(QuotaTypes.UserClientIdQuotaEnabled,
clientQuotaManager.quotaTypesEnabled)
+ assertTrue(clientQuotaManager.quotasEnabled)
+
+ // Remove the first user-client-id quota, quotaTypesEnabled should be
noQuotas as both user-client-id quotas has the same user client but different
quota
+
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")),
Some(ClientQuotaManager.ClientIdEntity("client1")), None)
+ assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled)
+ assertFalse(clientQuotaManager.quotasEnabled)
+
+ // Remove the second user-client-id quota, quotaTypesEnabled should be
QuotaTypes.NoQuotas and quotasEnabled should be false
+
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")),
Some(ClientQuotaManager.ClientIdEntity("client1")), None)
+ assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled)
+ assertFalse(clientQuotaManager.quotasEnabled)
+ } finally {
+ clientQuotaManager.shutdown()
+ }
+ }
+
+ @Test
+ def testQuotaTypesEnabledUpdatesWithCustomCallback(): Unit = {
Review Comment:
We don't rely on quotaTypesEnabled with custom callbacks, so don't need this
test. Or change it to just verify that QuotaTypes.CustomQuotas is set.
##########
core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala:
##########
@@ -479,6 +479,8 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback
with Reconfigurable w
override def quotaResetRequired(quotaType: ClientQuotaType): Boolean =
customQuotasUpdated(quotaType).getAndSet(false)
+ override def getActiveQuotasEntities: util.Set[ClientQuotaEntity] =
Set.empty[ClientQuotaEntity].asJava
Review Comment:
Not needed.
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -647,6 +690,9 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
Map(DefaultTags.User -> userTag, DefaultTags.ClientId -> clientIdTag)
}
+ override def getActiveQuotasEntities: util.Set[ClientQuotaEntity] = {
+ overriddenQuotas.keySet().asScala.toSet.asJava
Review Comment:
Unnecessary conversions, just need `overriddenQuotas.keySet()`
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +449,51 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Updates `quotaTypesEnabled` by performing a bitwise OR operation to
combine the enabled quota types.
+ * This method ensures that the `quotaTypesEnabled` field reflects the
active quota types based on the
+ * current state of `activeQuotaEntities`.
+ * For example:
+ * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then
quotaTypesEnabled = 3 (2 | 1 = 3)
+ * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then
quotaTypesEnabled = (4 | 1 = 5)
+ * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then
quotaTypesEnabled = 6 (4 | 2 = 6)
+ * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+ *
+ * @param quotaEntity The entity for which the quota is being updated,
which can be a combination of user and client-id.
+ * @param shouldAdd A boolean indicating whether to add or remove the
quota entity.
+ */
+ private def updateQuotaTypes(quotaEntity: KafkaQuotaEntity, shouldAdd:
Boolean): Unit = {
+
+ val activeQuotaType = quotaEntity match {
+ case KafkaQuotaEntity(Some(_), Some(_)) =>
QuotaTypes.UserClientIdQuotaEnabled
+ case KafkaQuotaEntity(Some(_), None) => QuotaTypes.UserQuotaEnabled
+ case KafkaQuotaEntity(None, Some(_)) => QuotaTypes.ClientIdQuotaEnabled
+ case _ => QuotaTypes.NoQuotas
+ }
+
+ val isActive = quotaCallback.getActiveQuotasEntities.contains(quotaEntity)
Review Comment:
Since we cannot put `getActiveQuotasEntities` in the interface, we can
instead do:
```
val isActive = quotaCallback match {
case callback: DefaultQuotaCallback =>
callback.getActiveQuotasEntities.contains(quotaEntity)
case _ => true
}
```
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java:
##########
@@ -154,6 +155,11 @@ public boolean updateClusterMetadata(Cluster cluster) {
return true;
}
+ @Override
+ public Set<ClientQuotaEntity> getActiveQuotasEntities() {
+ return Set.of();
+ }
Review Comment:
We shouldn't rely on updating custom quota callbacks since users may have
their own implementations.
##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1759,6 +1759,8 @@ class DummyClientQuotaCallback extends
ClientQuotaCallback with Reconfigurable {
override def close(): Unit = {}
+ override def getActiveQuotasEntities: util.Set[quota.ClientQuotaEntity] =
util.Collections.emptySet()
Review Comment:
Not needed.
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +449,51 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Updates `quotaTypesEnabled` by performing a bitwise OR operation to
combine the enabled quota types.
+ * This method ensures that the `quotaTypesEnabled` field reflects the
active quota types based on the
+ * current state of `activeQuotaEntities`.
+ * For example:
+ * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then
quotaTypesEnabled = 3 (2 | 1 = 3)
+ * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then
quotaTypesEnabled = (4 | 1 = 5)
+ * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then
quotaTypesEnabled = 6 (4 | 2 = 6)
+ * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+ *
+ * @param quotaEntity The entity for which the quota is being updated,
which can be a combination of user and client-id.
+ * @param shouldAdd A boolean indicating whether to add or remove the
quota entity.
+ */
+ private def updateQuotaTypes(quotaEntity: KafkaQuotaEntity, shouldAdd:
Boolean): Unit = {
+
+ val activeQuotaType = quotaEntity match {
+ case KafkaQuotaEntity(Some(_), Some(_)) =>
QuotaTypes.UserClientIdQuotaEnabled
+ case KafkaQuotaEntity(Some(_), None) => QuotaTypes.UserQuotaEnabled
+ case KafkaQuotaEntity(None, Some(_)) => QuotaTypes.ClientIdQuotaEnabled
+ case _ => QuotaTypes.NoQuotas
+ }
+
+ val isActive = quotaCallback.getActiveQuotasEntities.contains(quotaEntity)
+ if (shouldAdd && !isActive) {
+ activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if
(currentValue == 0) 1 else currentValue + 1)
+ quotaTypesEnabled |= activeQuotaType
+ } else if (!shouldAdd && isActive) {
+ activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if
(currentValue <= 1) 0 else currentValue - 1)
+ if (activeQuotaEntities.get(activeQuotaType) == 0) {
+ quotaTypesEnabled &= ~activeQuotaType
+ }
+ }
+
+ val quotaTypes = List(
+ QuotaTypes.UserClientIdQuotaEnabled -> "UserClientIdQuota",
+ QuotaTypes.ClientIdQuotaEnabled -> "ClientIdQuota",
+ QuotaTypes.UserQuotaEnabled -> "UserQuota"
+ )
+
+ val activeEntities = quotaTypes.collect {
+ case (k, name) if activeQuotaEntities.get(k) > 0 => name
+ }.mkString(", ")
+ info(s"Quota types enabled has been changed to $quotaTypesEnabled with
active quota entities: [$activeEntities]")
Review Comment:
We should probably only do `updateQuotaTypes()` only if custom quotas are
not used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]