dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1567372730
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
);
return describedGroup;
}
+
+ /**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry The SnapshotRegistry.
+ * @param metrics The GroupCoordinatorMetricsShard.
+ * @param classicGroup The converted classic group.
+ * @param topicsImage The TopicsImage for topic id and topic name
conversion.
+ * @param log The logger to use.
+ * @return The created ConsumerGruop.
+ */
+ public static ConsumerGroup fromClassicGroup(
+ SnapshotRegistry snapshotRegistry,
+ GroupCoordinatorMetricsShard metrics,
+ ClassicGroup classicGroup,
+ TopicsImage topicsImage,
+ Logger log
+ ) {
+ String groupId = classicGroup.groupId();
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
+ consumerGroup.setGroupEpoch(classicGroup.generationId());
+ consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+ classicGroup.allMembers().forEach(classicGroupMember -> {
+ // The new ConsumerGroupMember's assignedPartitions and
targetAssignmentSet need to be the same
+ // in order to keep it stable. Thus, both of them are set to be
classicGroupMember.assignment().
+ // If the consumer's real assigned partitions haven't been updated
according to
+ // classicGroupMember.assignment(), it will retry the request.
+ ConsumerPartitionAssignor.Assignment assignment =
ConsumerProtocol.deserializeAssignment(
+ ByteBuffer.wrap(classicGroupMember.assignment()));
Review Comment:
nit: Should we move the closing parenthesis to new line to be consistent?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
);
return describedGroup;
}
+
+ /**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry The SnapshotRegistry.
+ * @param metrics The GroupCoordinatorMetricsShard.
+ * @param classicGroup The converted classic group.
+ * @param topicsImage The TopicsImage for topic id and topic name
conversion.
+ * @param log The logger to use.
+ * @return The created ConsumerGruop.
+ */
+ public static ConsumerGroup fromClassicGroup(
+ SnapshotRegistry snapshotRegistry,
+ GroupCoordinatorMetricsShard metrics,
+ ClassicGroup classicGroup,
+ TopicsImage topicsImage,
+ Logger log
+ ) {
+ String groupId = classicGroup.groupId();
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
+ consumerGroup.setGroupEpoch(classicGroup.generationId());
+ consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+ classicGroup.allMembers().forEach(classicGroupMember -> {
+ // The new ConsumerGroupMember's assignedPartitions and
targetAssignmentSet need to be the same
+ // in order to keep it stable. Thus, both of them are set to be
classicGroupMember.assignment().
+ // If the consumer's real assigned partitions haven't been updated
according to
+ // classicGroupMember.assignment(), it will retry the request.
+ ConsumerPartitionAssignor.Assignment assignment =
ConsumerProtocol.deserializeAssignment(
+ ByteBuffer.wrap(classicGroupMember.assignment()));
+ Map<Uuid, Set<Integer>> partitions =
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ ConsumerPartitionAssignor.Subscription subscription =
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));
Review Comment:
ditto.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]