dajac commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2123076152
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,35 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Helper method to update quotaTypesEnabled which is a bitwise OR
combination of the enabled quota types.
+ * For example:
+ * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then
quotaTypesEnabled = 3 (2 | 1 = 3)
+ * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then
quotaTypesEnabled = (4 | 1 = 5)
+ * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then
quotaTypesEnabled = 6 (4 | 2 = 6)
+ * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+ */
+ private def updateQuotaTypes(): Unit = {
+ quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) {
Review Comment:
nit: There is an extra space after `=`.
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,35 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Helper method to update quotaTypesEnabled which is a bitwise OR
combination of the enabled quota types.
+ * For example:
+ * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then
quotaTypesEnabled = 3 (2 | 1 = 3)
+ * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then
quotaTypesEnabled = (4 | 1 = 5)
+ * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then
quotaTypesEnabled = 6 (4 | 2 = 6)
+ * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+ */
+ private def updateQuotaTypes(): Unit = {
+ quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) {
+ QuotaTypes.CustomQuotas
+ } else {
+ QuotaTypes.NoQuotas
+ }
+
+ activeQuotaEntities.forEach {
+ case KafkaQuotaEntity(Some(_), Some(_)) =>
+ quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+ case KafkaQuotaEntity(Some(_), None) =>
+ quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+ case KafkaQuotaEntity(None, Some(_)) =>
+ quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+ case _ => // Unexpected entity type
+ }
Review Comment:
I suppose that `activeQuotaEntities` could be rather large. Does it have any
performance impact? We are basically moving from a O(1) operation to O(N) here.
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,35 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Helper method to update quotaTypesEnabled which is a bitwise OR
combination of the enabled quota types.
+ * For example:
+ * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then
quotaTypesEnabled = 3 (2 | 1 = 3)
+ * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then
quotaTypesEnabled = (4 | 1 = 5)
+ * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then
quotaTypesEnabled = 6 (4 | 2 = 6)
+ * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+ */
+ private def updateQuotaTypes(): Unit = {
+ quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) {
+ QuotaTypes.CustomQuotas
+ } else {
+ QuotaTypes.NoQuotas
+ }
+
+ activeQuotaEntities.forEach {
+ case KafkaQuotaEntity(Some(_), Some(_)) =>
+ quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+ case KafkaQuotaEntity(Some(_), None) =>
+ quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+ case KafkaQuotaEntity(None, Some(_)) =>
+ quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+ case _ => // Unexpected entity type
+ }
+
+ val activeEntities = if (activeQuotaEntities.isEmpty) "No active quota
entities" else activeQuotaEntities.asScala.map(_.toString).mkString(", ")
+ info(s"Quota types enabled has been changed with active quota entities:
[$activeEntities]")
Review Comment:
Generating long string could be costly. Does it bring real value? Is this
something that we could log in debug?
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -428,18 +426,19 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
try {
val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
- if (userEntity.nonEmpty) {
- if (quotaEntity.clientIdEntity.nonEmpty)
- quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
- else
- quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
- } else if (clientEntity.nonEmpty)
- quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
quota match {
- case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType,
quotaEntity, newQuota.bound)
- case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+ case Some(newQuota) =>
+ quotaCallback.updateQuota(clientQuotaType, quotaEntity,
newQuota.bound)
+ if(activeQuotaEntities.add(quotaEntity)){
Review Comment:
nit: A space misses after `if` and another before `{`.
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -533,6 +561,9 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
throttledChannelReaper.awaitShutdown()
}
+ // Visible for testing
+ def getQuotaTypesEnabled: Int = quotaTypesEnabled
Review Comment:
nit: Should we just make `quotaTypesEnabled` package private?
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -428,18 +426,19 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
try {
val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
- if (userEntity.nonEmpty) {
- if (quotaEntity.clientIdEntity.nonEmpty)
- quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
- else
- quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
- } else if (clientEntity.nonEmpty)
- quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
quota match {
- case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType,
quotaEntity, newQuota.bound)
- case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+ case Some(newQuota) =>
+ quotaCallback.updateQuota(clientQuotaType, quotaEntity,
newQuota.bound)
+ if(activeQuotaEntities.add(quotaEntity)){
+ updateQuotaTypes()
+ }
+ case None =>
+ quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+ if (activeQuotaEntities.remove(quotaEntity)){
Review Comment:
nit: A space misses before `{`.
##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,35 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
}
}
+ /**
+ * Helper method to update quotaTypesEnabled which is a bitwise OR
combination of the enabled quota types.
+ * For example:
+ * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then
quotaTypesEnabled = 3 (2 | 1 = 3)
+ * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then
quotaTypesEnabled = (4 | 1 = 5)
+ * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then
quotaTypesEnabled = 6 (4 | 2 = 6)
+ * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+ */
+ private def updateQuotaTypes(): Unit = {
+ quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) {
Review Comment:
`quotaTypesEnabled` is a volatile variable. This suggests that we can access
it anytime. In this method, we update it in a non atomic manner. I wonder if
this could have any undesired impact. It would be better to use a local
variable and to update `quotaTypesEnabled` at the end.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]