dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568962281
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
}
}
+ public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String
memberId) {
+ if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+ log.info("Cannot downgrade consumer group {} to classic group
because the online downgrade is disabled.",
+ consumerGroup.groupId());
+ return false;
+ } else if
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+ log.debug("Cannot downgrade consumer group {} to classic group
because not all its members use the classic protocol.",
+ consumerGroup.groupId());
+ return false;
+ } else if (consumerGroup.numMembers() <= 1) {
+ log.info("Skip downgrading the consumer group {} to classic group
because it's empty.",
+ consumerGroup.groupId());
+ return false;
+ } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+ log.info("Cannot downgrade consumer group {} to classic group
because its group size is greater than classic group max size.",
+ consumerGroup.groupId());
+ }
+ return true;
+ }
+
+ public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup
consumerGroup, String leavingMemberId, List<Record> records) {
+ consumerGroup.createGroupTombstoneRecords(records);
+ ClassicGroup classicGroup;
+ try {
+ classicGroup = consumerGroup.toClassicGroup(
+ leavingMemberId,
+ logContext,
+ time,
+ consumerGroupSessionTimeoutMs,
+ metadataImage,
+ records
+ );
+ } catch (SchemaException e) {
+ log.warn("Cannot downgrade the consumer group " +
consumerGroup.groupId() + ": fail to parse " +
+ "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE +
".", e);
+
+ throw new GroupIdNotFoundException(String.format("Cannot downgrade
the classic group %s: %s.",
+ consumerGroup.groupId(), e.getMessage()));
+ }
+
+ groups.put(consumerGroup.groupId(), classicGroup);
+ metrics.onClassicGroupStateTransition(null,
classicGroup.currentState());
+
+ CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+ appendFuture.whenComplete((__, t) -> {
+ if (t == null) {
+ classicGroup.allMembers().forEach(member ->
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+ prepareRebalance(classicGroup, String.format("Downgrade group
%s.", classicGroup.groupId()));
Review Comment:
If creating all the state immediately, we'll get error when replaying the
old ConsumerGroup tombstone because `group.get(groupId)` has become a
ClassicGroup. We can't rely on replaying the records to update the states
either, because we need the new classicGroup reference to trigger the
rebalance. so the only way is not to replay the records by setting the
appendFuture.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]