mjsax commented on code in PR #19691:
URL: https://github.com/apache/kafka/pull/19691#discussion_r2085659458
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java:
##########
@@ -355,6 +357,9 @@ public Map<String, Subtopology> subtopologies() {
return subtopologies;
}
+ public int endpointInformationEpoch() {
+ return endpointInformationEpoch;
+ }
public int topologyEpoch() {
Review Comment:
nit: missing empty line
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java:
##########
@@ -329,6 +329,8 @@ public String toString() {
private final
AtomicReference<List<StreamsGroupHeartbeatResponseData.Status>> statuses = new
AtomicReference<>(List.of());
+ private int endpointInformationEpoch = 0;
Review Comment:
Should we init this with `-1` ?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
- .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+ .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+
+ if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) {
+
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+ } else {
+ int responseEndpoint = Math.min(group.endpointInformationEpoch(),
memberEndpointEpochInRequest);
Review Comment:
We do not persist the endpoint-epoch, right? That is the case you are
talking about?
But I thought for this case `group.endpointInformationEpoch()` would be `-1`
[or whatever value we initialize it] (ie unknown?) -- So
`memberEndpointEpochInRequest` should always be larger?
Correct me if I am wrong. The end-to-end control flow, is not totally clear
to me atm.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1983,8 +1984,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
- .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+ .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+
+ if (group.endpointInformationEpoch() > memberEndpointEpochInRequest) {
+
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+ } else {
+ int responseEndpoint = Math.min(group.endpointInformationEpoch(),
memberEndpointEpochInRequest);
Review Comment:
```suggestion
int responseEndpointInformationEpoch =
Math.min(group.endpointInformationEpoch(), memberEndpointEpochInRequest);
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1842,7 +1842,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
String processId,
Endpoint userEndpoint,
List<KeyValue> clientTags,
- boolean shutdownApplication
+ boolean shutdownApplication,
+ int memberEndpointEpochInRequest
Review Comment:
All other variables also don't use `InRequest` suffix
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -205,6 +205,12 @@ public static class DeadlineAndEpoch {
*/
private Optional<String> shutdownRequestMemberId = Optional.empty();
+ /**
+ * The current epoch for endpoint information, this is used to determine
when to send
+ * updated endpoint information to members of the group.
+ */
+ private int endpointInformationEpoch;
Review Comment:
Should we init this to `-1`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1993,6 +2002,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks()));
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
+ group.setEndpointInformationEpoch(group.endpointInformationEpoch()
+ 1);
Review Comment:
Do we need to change the reponse, too, using the bumped epoch, as also
ensure the endpoint information is set?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1842,7 +1842,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
String processId,
Endpoint userEndpoint,
List<KeyValue> clientTags,
- boolean shutdownApplication
+ boolean shutdownApplication,
+ int memberEndpointEpochInRequest
Review Comment:
```suggestion
int memberEndpointEpoch
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]