dajac commented on code in PR #19904:
URL: https://github.com/apache/kafka/pull/19904#discussion_r2131654014
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -204,6 +205,11 @@ public class GroupCoordinatorConfig {
ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from
consumer group to classic group is enabled, " +
ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor
downgrade is enabled.";
+ public static final String
CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_CONFIG =
"group.consumer.regex.batch.refresh.max.interval.ms";
+ public static final String
CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DOC = "The interval at which
the group coordinator will refresh " +
+ "the topics matching the group subscribed regexes. This is only
applicable to consumer groups using the consumer group protocol. ";
+ public static final int
CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DEFAULT = 10 * 60 * 1000; //
10 minutes
+
Review Comment:
We cannot add a new config without a KIP now. We will soon do a KIP to
improve regular expressions, I suggest to include it in that one. In the mean
time, we can keep it internal, mainly for testing purposes. What do you think?
If you agree, let's add a comment to state that it is internal for testing.
Regarding the name, I would go with something simpler:
`group.consumer.regex.refresh.interval.ms`. What do you think?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##########
@@ -308,6 +314,7 @@ public class GroupCoordinatorConfig {
.define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT,
CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_SIZE_DOC)
.define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST,
CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
.define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING,
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
+ .define(CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_CONFIG,
INT, CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DEFAULT,
atLeast(REGEX_BATCH_REFRESH_MIN_INTERVAL_MS + 1), MEDIUM,
CONSUMER_GROUP_REGEX_BATCH_REFRESH_MAX_INTERVAL_MS_DOC)
Review Comment:
nit: Should we require at least 1 minutes to avoid having too many refreshes?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3134,11 +3137,11 @@ private boolean maybeUpdateRegularExpressions(
// 2. The last refresh is older than 10s. If the group does not have
any regular
// expressions but the current member just brought a new one, we
should continue.
long lastRefreshTimeMs =
group.lastResolvedRegularExpressionRefreshTimeMs();
- if (time.milliseconds() <= lastRefreshTimeMs +
REGEX_BATCH_REFRESH_INTERVAL_MS) {
+ if (time.milliseconds() <= lastRefreshTimeMs +
REGEX_BATCH_REFRESH_MIN_INTERVAL_MS) {
Review Comment:
nit: Let's extract time into a var (e.g. currentTimeMs) as we reuse it now.
##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -3076,6 +3076,23 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
sendAndReceiveRegexHeartbeat(response, listenerName, None)
}
+ @Test
+ def testConsumerGroupHeartbeatWithRegexWithGrantedTopicDescribeAcl(): Unit =
{
Review Comment:
Should we add a test in which we start with an ACL, subscribe to the topic,
get it, remove the ACL, and finally verify that it is removed after the refresh
timeout?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]