lucasbru commented on code in PR #20600:
URL: https://github.com/apache/kafka/pull/20600#discussion_r2410845555


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16035,7 +16038,7 @@ public void testStreamsGroupMemberEpochValidation() {
 
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 member));
 
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
 100, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
 100, 0, 0));

Review Comment:
   Here, we do have an assignment, so at this stage the topology should be 
validated. 
   
   I changed this further above, where we did not have a validated topology but 
an assignment, which can happen only temporarily if a topic is resized, so it's 
kind of a corner case.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -148,9 +148,9 @@ public static class DeadlineAndEpoch {
     private final TimelineHashMap<String, String> staticMembers;
 
     /**
-     * The metadata associated with each subscribed topic name.
+     * The topology epoch for which the subscribed topics identified by 
metadataHash are validated.
      */
-    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+    protected final TimelineInteger validatedTopologyEpoch;

Review Comment:
   Good catch! Fixed.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java:
##########
@@ -250,19 +250,20 @@ public void testNewStreamsGroupMemberTombstoneRecord() {
     }
 
     @Test
-    public void testNewStreamsGroupEpochRecord() {
+    public void testNewStreamsGroupMetadataRecord() {
         CoordinatorRecord expectedRecord = CoordinatorRecord.record(
             new StreamsGroupMetadataKey()
                 .setGroupId(GROUP_ID),
             new ApiMessageAndVersion(
                 new StreamsGroupMetadataValue()
                     .setEpoch(42)
-                    .setMetadataHash(42),
+                    .setMetadataHash(42)
+                    .setValidatedTopologyEpoch(43),
                 (short) 0
             )
         );
 
-        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42, 42));
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 42, 
43));

Review Comment:
   Good point. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to