dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1567372730


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment()));

Review Comment:
   nit: Should we move the closing parenthesis to new line to be consistent?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to