dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1502633032
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -2079,253 +2056,20 @@ public void testReconciliationProcess() {
assertRecordsEquals(Collections.singletonList(
RecordHelpers.newCurrentAssignmentRecord(groupId, new
ConsumerGroupMember.Builder(memberId3)
+ .setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(11)
- .setTargetMemberEpoch(11)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5),
mkTopicAssignment(barTopicId, 1)))
.build())),
result.records()
);
- assertEquals(ConsumerGroupMember.MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId3));
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId3));
assertEquals(ConsumerGroup.ConsumerGroupState.STABLE,
context.consumerGroupState(groupId));
}
- @Test
- public void testReconciliationRestartsWhenNewTargetAssignmentIsInstalled()
{
Review Comment:
Correct.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
// 1. The member reported its owned partitions;
// 2. The member just joined or rejoined to group (epoch equals to
zero);
// 3. The member's assignment has been updated.
- if (ownedTopicPartitions != null || memberEpoch == 0 ||
assignmentUpdated) {
+ if (ownedTopicPartitions != null || memberEpoch == 0 ||
hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(createResponseAssignment(updatedMember));
}
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId The group id.
+ * @param member The member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ * a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment The target assignment.
+ * @param ownedTopicPartitions The list of partitions owned by the
member. This
+ * is reported in the ConsumerGroupHeartbeat
API and
+ * it could be null if not provided.
+ * @param records The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+ private ConsumerGroupMember maybeReconcile(
+ String groupId,
+ ConsumerGroupMember member,
+ BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+ int targetAssignmentEpoch,
+ Assignment targetAssignment,
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
+ List<Record> records
+ ) {
+ if (member.isReconciledTo(targetAssignmentEpoch)) {
+ return member;
+ }
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+ .withCurrentPartitionEpoch(currentPartitionEpoch)
+ .withOwnedTopicPartitions(ownedTopicPartitions)
+ .build();
+
+ if (!updatedMember.equals(member)) {
+ records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+ log.info("[GroupId {}] Member {} new assignment state: epoch={},
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+ groupId, updatedMember.memberId(),
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(),
updatedMember.state(),
+ formatAssignment(updatedMember.assignedPartitions()),
formatAssignment(updatedMember.revokedPartitions()));
+
+ if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+ scheduleConsumerGroupRebalanceTimeout(
+ groupId,
+ updatedMember.memberId(),
+ updatedMember.memberEpoch(),
+ updatedMember.rebalanceTimeoutMs()
+ );
+ } else {
Review Comment:
Yeah, that's right.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##########
@@ -170,72 +127,119 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
* @return A new ConsumerGroupMember or the current one.
*/
public ConsumerGroupMember build() {
- // A new target assignment has been installed, we need to restart
- // the reconciliation loop from the beginning.
- if (targetAssignmentEpoch != member.targetMemberEpoch()) {
- return transitionToNewTargetAssignmentState();
- }
-
switch (member.state()) {
- // Check if the partitions have been revoked by the member.
- case REVOKING:
- return maybeTransitionFromRevokingToAssigningOrStable();
+ case STABLE:
+ // When the member is in the STABLE state, we verify if a newer
+ // epoch (or target assignment) is available. If it is, we can
+ // reconcile the member towards it. Otherwise, we return.
+ if (member.memberEpoch() != targetAssignmentEpoch) {
+ return computeNextAssignment(
+ member.memberEpoch(),
+ member.assignedPartitions()
+ );
+ } else {
+ return member;
+ }
- // Check if pending partitions have been freed up.
- case ASSIGNING:
- return maybeTransitionFromAssigningToAssigningOrStable();
+ case UNREVOKED_PARTITIONS:
+ // When the member is in the UNREVOKED_PARTITIONS state, we
wait
+ // until the member has revoked the necessary partitions. They
are
+ // considered revoked when they are not anymore reported in the
+ // owned partitions set in the ConsumerGroupHeartbeat API.
- // Nothing to do.
- case STABLE:
- return member;
+ // If the member does not provide its owned partitions. We
cannot
+ // progress.
+ if (ownedTopicPartitions == null) {
+ return member;
+ }
+
+ // If the member provides its owned partitions. We verify if
it still
+ // owens any of the revoked partitions. If it does, we cannot
progress.
+ for (ConsumerGroupHeartbeatRequestData.TopicPartitions
topicPartitions : ownedTopicPartitions) {
+ for (Integer partitionId : topicPartitions.partitions()) {
+ boolean stillHasRevokedPartition = member
+ .revokedPartitions()
+ .getOrDefault(topicPartitions.topicId(),
Collections.emptySet())
+ .contains(partitionId);
+ if (stillHasRevokedPartition) {
+ return member;
+ }
+ }
+ }
+
+ // When the member has revoked all the pending partitions, it
can
+ // transition to the next epoch (current + 1) and we can
reconcile
+ // its state towards the latest target assignment.
+ return computeNextAssignment(
+ member.memberEpoch() + 1,
Review Comment:
Yep. You got it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1168,36 +1170,15 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
// 3. Reconcile the member's assignment with the target assignment.
This is only required if
// the member is not stable or if a new target assignment has been
installed.
Review Comment:
I changed the entire comment.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1807,9 +1776,9 @@ public void testReconciliationProcess() {
assertRecordsEquals(Collections.singletonList(
RecordHelpers.newCurrentAssignmentRecord(groupId, new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(10)
- .setPreviousMemberEpoch(9)
- .setTargetMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
Review Comment:
In the [previous
implementation](https://github.com/apache/kafka/pull/15364/files#diff-6c5e23803064f4b7a122eee29736d036b9cfe244e69f48751ba163d62e2bf35fL257),
the previous epoch was not updated. It feels like a bug. In the new
implementation, I have set the previous epoch whenever the member epoch is
updated.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
// 1. The member reported its owned partitions;
Review Comment:
No. This condition is actually slightly incorrect. I will fix it. The idea
is to provide a full response when we received a full request from the client.
Here, full means that all the fields are set.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
// 1. The member reported its owned partitions;
// 2. The member just joined or rejoined to group (epoch equals to
zero);
// 3. The member's assignment has been updated.
- if (ownedTopicPartitions != null || memberEpoch == 0 ||
assignmentUpdated) {
+ if (ownedTopicPartitions != null || memberEpoch == 0 ||
hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(createResponseAssignment(updatedMember));
}
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId The group id.
+ * @param member The member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ * a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment The target assignment.
+ * @param ownedTopicPartitions The list of partitions owned by the
member. This
+ * is reported in the ConsumerGroupHeartbeat
API and
+ * it could be null if not provided.
+ * @param records The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+ private ConsumerGroupMember maybeReconcile(
+ String groupId,
+ ConsumerGroupMember member,
+ BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+ int targetAssignmentEpoch,
+ Assignment targetAssignment,
+ List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions,
+ List<Record> records
+ ) {
+ if (member.isReconciledTo(targetAssignmentEpoch)) {
+ return member;
+ }
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+ .withCurrentPartitionEpoch(currentPartitionEpoch)
+ .withOwnedTopicPartitions(ownedTopicPartitions)
+ .build();
+
+ if (!updatedMember.equals(member)) {
+ records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+ log.info("[GroupId {}] Member {} new assignment state: epoch={},
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+ groupId, updatedMember.memberId(),
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(),
updatedMember.state(),
+ formatAssignment(updatedMember.assignedPartitions()),
formatAssignment(updatedMember.revokedPartitions()));
+
+ if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+ scheduleConsumerGroupRebalanceTimeout(
+ groupId,
+ updatedMember.memberId(),
+ updatedMember.memberEpoch(),
+ updatedMember.rebalanceTimeoutMs()
+ );
+ } else {
+ cancelConsumerGroupRebalanceTimeout(groupId,
updatedMember.memberId());
+ }
+ }
+
+ return updatedMember;
+ }
+
+ private String formatAssignment(
Review Comment:
We could get it from the MetadataImage but it may not be available. I think
that using the topic id is fine here.
##########
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json:
##########
@@ -24,20 +24,12 @@
"about": "The current member epoch that is expected from the member in
the heartbeat request." },
{ "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
"about": "If the last epoch bump is lost before reaching the member, the
member will retry with the previous epoch." },
- { "name": "TargetMemberEpoch", "versions": "0+", "type": "int32",
- "about": "The target epoch corresponding to the assignment used to
compute the AssignedPartitions, the PartitionsPendingRevocation and the
PartitionsPendingAssignment fields." },
+ { "name": "State", "versions": "0+", "type": "int8",
+ "about": "The member state. See ConsumerGroupMember.MemberState for the
possible values." },
{ "name": "AssignedPartitions", "versions": "0+", "type":
"[]TopicPartitions",
"about": "The partitions assigned to (or owned by) this member." },
{ "name": "PartitionsPendingRevocation", "versions": "0+", "type":
"[]TopicPartitions",
- "about": "The partitions that must be revoked by this member." },
- { "name": "PartitionsPendingAssignment", "versions": "0+", "type":
"[]TopicPartitions",
- "about": "The partitions that will be assigned to this member when they
are freed up by their current owners." },
- { "name": "Error", "versions": "0+", "type": "int8",
- "about": "The error reported by the assignor." },
- { "name": "MetadataVersion", "versions": "0+", "type": "int16",
- "about": "The version of the metadata bytes." },
- { "name": "MetadataBytes", "versions": "0+", "type": "bytes",
- "about": "The metadata bytes." }
Review Comment:
I thought that we could remove unused fields as we are making non-backward
compatible changes anyway. Those fields will be used when we implement the
client side assignor, if we ever do it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]