rajinisivaram commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2162513574
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -155,9 +155,10 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
case None => new DefaultQuotaCallback
}
private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
+ private val activeQuotaEntities = new util.HashSet[KafkaQuotaEntity]()
Review Comment:
Do we really need to maintain this new set of quota entities? When custom
quotas are used, I am not sure we can optimize based on active entities anyway.
And for the default quota callback, we already have the map containing all
entities, couldn't we use that instead?
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,41 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Helper method to update quotaTypesEnabled which is a bitwise OR
combination of the enabled quota types.
+ * For example:
+ * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then
quotaTypesEnabled = 3 (2 | 1 = 3)
+ * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then
quotaTypesEnabled = (4 | 1 = 5)
+ * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then
quotaTypesEnabled = 6 (4 | 2 = 6)
+ * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+ */
+ private def updateQuotaTypes(): Unit = {
+ var updatedQuotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) {
+ QuotaTypes.CustomQuotas
+ } else {
+ QuotaTypes.NoQuotas
+ }
+
+ val activeQuotaTypes = scala.collection.mutable.Set[String]()
+
+ activeQuotaEntities.forEach {
+ case KafkaQuotaEntity(Some(_), Some(_)) =>
+ updatedQuotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+ activeQuotaTypes += "UserClientIdQuota"
+ case KafkaQuotaEntity(Some(_), None) =>
+ updatedQuotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+ activeQuotaTypes += "UserQuota"
+ case KafkaQuotaEntity(None, Some(_)) =>
+ updatedQuotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+ activeQuotaTypes += "ClientIdQuota"
+ case _ => // Unexpected entity type
+ }
+
+ quotaTypesEnabled = updatedQuotaTypesEnabled
Review Comment:
This flag is used to determine quota metric tags. In most production
systems, I would expect quota types to be stable (i.e users and client ids may
be added frequently, but switching from user-based quotas to client-id based
quotas is less likely). In those systems, this PR does not add much value. It
handles the less likely case where quota types are changing. In this case have
we considered the impact of frequent changes to `quotaTypesEnabled` as entities
are added or removed?
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,41 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Helper method to update quotaTypesEnabled which is a bitwise OR
combination of the enabled quota types.
+ * For example:
+ * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then
quotaTypesEnabled = 3 (2 | 1 = 3)
+ * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then
quotaTypesEnabled = (4 | 1 = 5)
+ * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then
quotaTypesEnabled = 6 (4 | 2 = 6)
+ * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+ */
+ private def updateQuotaTypes(): Unit = {
+ var updatedQuotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) {
+ QuotaTypes.CustomQuotas
+ } else {
+ QuotaTypes.NoQuotas
+ }
+
+ val activeQuotaTypes = scala.collection.mutable.Set[String]()
+
+ activeQuotaEntities.forEach {
Review Comment:
`activeQuotaEntities` is potentially large because it includes all entities
for which quotas are set (every user for example). Here we just want to find
the first matching entry for each type, it seems unnecessary to compare every
element using `forEach`. Couldn't we just do some check based on the change
that triggered this? We are holding the write lock here, so it will be good to
avoid unnecessary processing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]