jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1522334545
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
// 1. The member reported its owned partitions;
// 2. The member just joined or rejoined to group (epoch equals to
zero);
// 3. The member's assignment has been updated.
- if (ownedTopicPartitions != null || memberEpoch == 0 ||
assignmentUpdated) {
+ if (ownedTopicPartitions != null || memberEpoch == 0 ||
hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(createResponseAssignment(updatedMember));
}
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId The group id.
+ * @param member The member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ * a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment The target assignment.
+ * @param ownedTopicPartitions The list of partitions owned by the
member. This
+ * is reported in the ConsumerGroupHeartbeat
API and
+ * it could be null if not provided.
+ * @param records The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+ private ConsumerGroupMember maybeReconcile(
+ String groupId,
+ ConsumerGroupMember member,
+ BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+ int targetAssignmentEpoch,
+ Assignment targetAssignment,
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
+ List<Record> records
+ ) {
+ if (member.isReconciledTo(targetAssignmentEpoch)) {
+ return member;
+ }
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+ .withCurrentPartitionEpoch(currentPartitionEpoch)
+ .withOwnedTopicPartitions(ownedTopicPartitions)
+ .build();
+
+ if (!updatedMember.equals(member)) {
+ records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+ log.info("[GroupId {}] Member {} new assignment state: epoch={},
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+ groupId, updatedMember.memberId(),
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(),
updatedMember.state(),
+ formatAssignment(updatedMember.assignedPartitions()),
formatAssignment(updatedMember.revokedPartitions()));
+
+ if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+ scheduleConsumerGroupRebalanceTimeout(
+ groupId,
+ updatedMember.memberId(),
+ updatedMember.memberEpoch(),
+ updatedMember.rebalanceTimeoutMs()
+ );
+ } else {
Review Comment:
I see. I think it is a little confusing for people trying to understand the
algorithm, but I agree that it is important to have it apply and be used in a
way that makes sense.
So just so I get it straight -- we have a timeout only for the revocation of
the partitions. If we hit the timeout, the member is fenced and I assume we can
count those partitions as revoked. (Or we no longer have to wait for them to be
revoked)
At the point all partitions are revoked, we can try to assign new partitions
to members. In this case, are we relying on the heartbeat to kick out members
if they don't ack the new assignments?
Just trying to confirm the reason for the separate timeout here. Is it
because revoking is more likely to fail even though the heartbeat still goes
through, but not the same for assigning?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]