lucasbru commented on code in PR #19691:
URL: https://github.com/apache/kafka/pull/19691#discussion_r2087438125
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java:
##########
@@ -355,6 +357,9 @@ public Map<String, Subtopology> subtopologies() {
return subtopologies;
}
+ public int endpointInformationEpoch() {
+ return endpointInformationEpoch;
Review Comment:
`StreamsRebalanceData` is used as an interface between the application
thread and the streams thread. The data here should be immutable or otherwise
threadsafe. But I'm not sure we need even access this epoch from the
application thread? I'd consider moving the client-side epoch to
`StreamsGroupHeartbeatManager.HeartbeatState`
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
- .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+ .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+
+ if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) {
+
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+ } else {
+ int responseEndpoint = Math.min(group.endpointInformationEpoch(),
memberEndpointEpochInRequest);
Review Comment:
I would always respond with `group.endpointInformationEpoch()` and omit this
if-block.
If the broker-side epoch is lost, it will be less than then member epoch, in
which case we should reset the member epoch to the broker epoch and resend the
endpoint information, just in case.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1993,6 +2002,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks()));
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
+ group.setEndpointInformationEpoch(group.endpointInformationEpoch()
+ 1);
Review Comment:
maybe we can move the block around setting endpoint information in the
response below this block.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
- .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+ .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+
+ if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) {
Review Comment:
For the case where we lose the group's endpoint information during
fail-over, shouldn't we also send the information, just in case?
Meaning that the condition should be
```group.endpointInformationEpoch() != memberEndpointEpochInRequest```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]