squah-confluent commented on code in PR #19967:
URL: https://github.com/apache/kafka/pull/19967#discussion_r2307416991
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -52,6 +54,10 @@ public void updateGroupConfig(String groupId, Properties
newGroupConfig) {
throw new InvalidRequestException("Group name can't be empty.");
}
+ if (groupConfigListener != null) {
+ groupConfigListener.onGroupConfigUpdate(groupId, newGroupConfig);
+ }
+
Review Comment:
`newGroupConfig` contains the entire set of config overrides for the group,
while the config listener expects only the changed values. This means we'll
trigger spurious rebalances when a group has a
`STREAMS_NUM_STANDBY_REPLICAS_CONFIG` config override and _any_ additional
config override is added or updated for that group.
We could try to compute the diff between `newConfig` below and the existing
config to address this.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -52,6 +54,10 @@ public void updateGroupConfig(String groupId, Properties
newGroupConfig) {
throw new InvalidRequestException("Group name can't be empty.");
}
+ if (groupConfigListener != null) {
+ groupConfigListener.onGroupConfigUpdate(groupId, newGroupConfig);
+ }
+
Review Comment:
`GroupConfigManager.updateGroupConfig` is called:
* On broker startup, when all group config overrides are replayed.
* When new group config overrides are configured.
The broker startup call is problematic because it can trigger spurious
rebalances depending on how quickly the group coordinator loads. I can't see a
good solution to this at the moment.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1913,6 +1914,11 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
// to persist the change, and bump the group epoch later.
boolean bumpGroupEpoch = hasStreamsMemberMetadataChanged(groupId,
member, updatedMember, records);
+ // Check if StreamsGroup has flag rebalanceRequired equals to true
+ if (group.rebalanceRequired()) {
+ bumpGroupEpoch = true;
+ }
+
Review Comment:
Instead of adding a new flag, we can bump the epoch directly when a config
of interest changes and this will trigger a new assignment on the next
heartbeat. We do this in `handleRegularExpressionsResult` to trigger a new
assignment on the next heartbeat.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -52,6 +54,10 @@ public void updateGroupConfig(String groupId, Properties
newGroupConfig) {
throw new InvalidRequestException("Group name can't be empty.");
}
+ if (groupConfigListener != null) {
+ groupConfigListener.onGroupConfigUpdate(groupId, newGroupConfig);
Review Comment:
If `GroupCoordinatorService.onGroupConfigUpdate` throws an exception when
not active, we will not update `configMap` below.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -2212,6 +2212,15 @@ public void onNewMetadataImage(
runtime.onNewMetadataImage(newImage, delta);
}
+ /**
+ * See {@link GroupCoordinator#onGroupConfigUpdate(String, Properties)}.
+ */
+ @Override
+ public void onGroupConfigUpdate(String groupId, Properties
updatedProperties) {
+ throwIfNotActive();
+ runtime.onGroupConfigUpdate(groupId, updatedProperties);
+ }
Review Comment:
We can avoid changing the coordinator runtime entirely. A group is owned by
a single shard/partition and we can get that partition using
`topicPartitionFor(groupId)`.
You can look at the `streamsGroupHeartbeat` implementation for an example.
```java
runtime.scheduleWriteOperation(
...,
topicPartitionFor(groupId),
...,
coordinator -> coordinator.onGroupConfigUpdate(groupId,
updatedProperties)
);
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##########
@@ -36,6 +36,8 @@ public class GroupConfigManager implements AutoCloseable {
private final Map<String, GroupConfig> configMap;
+ private volatile GroupCoordinator groupConfigListener;
Review Comment:
Since we don't need the entire set of `GroupCoordinator` methods, I would
propose defining an inner interface and have `GroupCoordinatorService`
implement it:
```java
public interface Listener {
void onGroupConfigUpdate(String groupId, Properties
updatedProperties);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]