Copilot commented on code in PR #21655:
URL: https://github.com/apache/kafka/pull/21655#discussion_r2895135889


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochsTest.java:
##########
@@ -152,9 +153,16 @@ public void 
testFromCurrentAssignmentRecordWithMismatchedEpochs() {
             .setPartitions(Arrays.asList(1, 2, 3))
             .setAssignmentEpochs(Arrays.asList(10, 11))); // Only 2 epochs for 
3 partitions
 
-        assertThrows(IllegalStateException.class, () ->
-            TasksTupleWithEpochs.fromCurrentAssignmentRecord(activeTasks, 
List.of(), List.of(), 100)
-        );
+        try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(TasksTupleWithEpochs.class)) {
+            TasksTupleWithEpochs tuple = 
TasksTupleWithEpochs.fromCurrentAssignmentRecord(activeTasks, List.of(), 
List.of(), 100);
+            assertEquals(
+                Map.of(SUBTOPOLOGY_1, Map.of(1, 100, 2, 100, 3, 100)),
+                tuple.activeTasksWithEpochs()
+            );
+            assertEquals(1, appender.getMessages("ERROR").stream()
+                .filter(msg -> msg.contains("Size of assignment epochs 2 is 
not equal to partitions 3 for subtopology 1."))
+                .count());
+        }

Review Comment:
   This test covers the mismatched-size case, but there’s still an important 
legacy edge case: `assignmentEpochs` may be present as an empty list (vs 
`null`). Given the parsing logic change, please add coverage that 
`setAssignmentEpochs(List.of())` is treated as legacy (fallback to member 
epoch) and does not emit an ERROR log (only true mismatches should).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java:
##########
@@ -137,19 +141,16 @@ private static Map<String, Map<Integer, Integer>> 
parseActiveTasksWithEpochs(
 
             Map<Integer, Integer> partitionsWithEpochs = new HashMap<>();
 
-            if (epochs != null && !epochs.isEmpty()) {
-                if (epochs.size() != partitions.size()) {
-                    throw new IllegalStateException(
-                        "Assignment epochs must be provided for all 
partitions. " +
-                        "Subtopology " + subtopologyId + " has " + 
partitions.size() +
-                        " partitions but " + epochs.size() + " epochs"
-                    );
-                }
-
+            if (epochs != null && epochs.size() == partitions.size()) {
                 for (int i = 0; i < partitions.size(); i++) {
                     partitionsWithEpochs.put(partitions.get(i), epochs.get(i));
                 }
             } else {
+                if (epochs != null) {
+                    log.error("Size of assignment epochs {} is not equal to 
partitions {} for subtopology {}. " +
+                            "Using default epoch {} for all partitions.",
+                        epochs.size(), partitions.size(), subtopologyId, 
memberEpoch);
+                }
                 // Legacy record without epochs: use member epoch as default

Review Comment:
   `assignmentEpochs` previously treated an empty list as the legacy “no 
epochs” case (no exception/log, just fallback to `memberEpoch`). With the new 
condition `epochs != null && epochs.size() == partitions.size()`, an empty 
`epochs` list (size 0) with non-empty `partitions` will now log an ERROR and 
fall back, which can spam logs if legacy records deserialize to an empty list 
rather than `null`. Consider treating `epochs == null || epochs.isEmpty()` as 
legacy (no error), and only logging when `epochs` is non-empty but its size 
mismatches `partitions`.
   ```suggestion
               if (epochs == null || epochs.isEmpty()) {
                   // Legacy record without epochs (or deserialized as empty): 
use member epoch as default
                   for (Integer partition : partitions) {
                       partitionsWithEpochs.put(partition, memberEpoch);
                   }
               } else if (epochs.size() == partitions.size()) {
                   for (int i = 0; i < partitions.size(); i++) {
                       partitionsWithEpochs.put(partitions.get(i), 
epochs.get(i));
                   }
               } else {
                   // Non-empty epochs list with size mismatch: log error and 
fall back to member epoch
                   log.error("Size of assignment epochs {} is not equal to 
partitions {} for subtopology {}. " +
                           "Using default epoch {} for all partitions.",
                       epochs.size(), partitions.size(), subtopologyId, 
memberEpoch);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to