squah-confluent commented on code in PR #21558:
URL: https://github.com/apache/kafka/pull/21558#discussion_r2875882416
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -264,20 +292,23 @@ public ConsumerGroupMember build() {
private final String serverAssignorName;
/**
- * The partitions assigned to this member.
+ * The partitions assigned to this member and their assignment epochs.
+ * A map of topic ids to partitions to assignment epochs.
*/
- private final Map<Uuid, Set<Integer>> assignedPartitions;
+ private final Map<Uuid, Map<Integer, Integer>> assignedPartitions;
/**
- * The partitions being revoked by this member.
+ * The partitions awaiting revocation from this member and their
assignment epochs.
+ * A map of topic ids to partitions to assignment epochs.
*/
- private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
+ private final Map<Uuid, Map<Integer, Integer>> partitionsPendingRevocation;
/**
* The classic member metadata if the consumer uses the classic protocol.
*/
private final ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata;
+
Review Comment:
nit: stray newline change
```suggestion
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -432,7 +495,7 @@ public ConsumerGroupDescribeResponseData.Member
asConsumerGroupDescribeMember(
.setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
}
- private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromMap(
+ private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromAssignmentWithoutEpochs(
Review Comment:
Since we've chosen to remove the `withEpochs` variant of this method, I
would just call it `topicPartitionsFromAssignment`.
```suggestion
private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromAssignment(
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -380,37 +381,39 @@ private ConsumerGroupMember computeNextAssignment(
for (Uuid topicId : allTopicIds) {
Set<Integer> target = targetAssignment.partitions()
.getOrDefault(topicId, Set.of());
- Set<Integer> currentAssignedPartitions = memberAssignedPartitions
- .getOrDefault(topicId, Set.of());
+ Map<Integer, Integer> currentAssignedPartitions =
memberAssignedPartitions
+ .getOrDefault(topicId, Map.of());
// If the member is no longer subscribed to the topic, treat its
target assignment as empty.
if (!subscribedTopicIds.contains(topicId)) {
target = Set.of();
}
// New Assigned Partitions = Previous Assigned Partitions ∩ Target
- Set<Integer> assignedPartitions = new
HashSet<>(currentAssignedPartitions);
- assignedPartitions.retainAll(target);
+ Map<Integer, Integer> assignedPartitions = new
HashMap<>(currentAssignedPartitions);
+ assignedPartitions.keySet().retainAll(target);
// Partitions Pending Revocation = Previous Assigned Partitions -
New Assigned Partitions
- Set<Integer> partitionsPendingRevocation = new
HashSet<>(currentAssignedPartitions);
- partitionsPendingRevocation.removeAll(assignedPartitions);
+ Map<Integer, Integer> partitionsPendingRevocation = new
HashMap<>(currentAssignedPartitions);
+
partitionsPendingRevocation.keySet().removeAll(assignedPartitions.keySet());
// Partitions Pending Assignment = Target - New Assigned
Partitions - Unreleased Partitions
Set<Integer> partitionsPendingAssignment = new HashSet<>(target);
- partitionsPendingAssignment.removeAll(assignedPartitions);
+ partitionsPendingAssignment.removeAll(assignedPartitions.keySet());
hasUnreleasedPartitions =
partitionsPendingAssignment.removeIf(partitionId ->
currentPartitionEpoch.apply(topicId, partitionId) != -1 &&
// Don't consider a partition unreleased if it is owned by the
current member
// because it is pending revocation. This is safe to do since
only a single member
// can own a partition at a time.
- !member.partitionsPendingRevocation().getOrDefault(topicId,
Set.of()).contains(partitionId)
+ !member.partitionsPendingRevocation().getOrDefault(topicId,
Map.of()).containsKey(partitionId)
) || hasUnreleasedPartitions;
+ // Build epochs map for assigned partitions, preserve existing
epochs
if (!assignedPartitions.isEmpty()) {
newAssignedPartitions.put(topicId, assignedPartitions);
}
+ // Build epochs map for partitions pending revocation, preserve
existing epochs
Review Comment:
I'm not sure this comment is necessary any more.
```suggestion
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java:
##########
@@ -554,16 +561,20 @@ public void
testUnrevokedPartitionsToStableWithReturnedPartitionsPendingRevocati
.setPartitions(Arrays.asList(5, 6))))
.build();
+ // Retained partitions keep original epoch (10), partition 4 was
pending revocation so gets new epoch (12),
+ // new partition 7 also gets new epoch (12)
+ Map<Uuid, Map<Integer, Integer>> expectedAssignment = Map.of(
+ topicId1, Map.of(2, 10, 3, 10, 4, 12),
+ topicId2, Map.of(5, 10, 6, 10, 7, 12)
+ );
+
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(12)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(topicId1, 2, 3, 4),
- mkTopicAssignment(topicId2, 5, 6, 7)))
- .setPartitionsPendingRevocation(Map.of())
+ .setAssignedPartitions(expectedAssignment)
Review Comment:
```suggestion
.setAssignedPartitions(mkAssignmentWithEpochs(
mkTopicAssignmentWithEpochs(topicId1, 10, 2, 3),
mkTopicAssignmentWithEpochs(topicId1, 12, 4),
mkTopicAssignmentWithEpochs(topicId2, 10, 5, 6),
mkTopicAssignmentWithEpochs(topicId2, 12, 7)))
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java:
##########
@@ -804,13 +804,19 @@ public static CoordinatorRecord
newShareGroupStatePartitionMetadataRecord(
}
private static
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
toTopicPartitions(
- Map<Uuid, Set<Integer>> topicPartitions
- ) {
- List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> topics
= new ArrayList<>(topicPartitions.size());
- topicPartitions.forEach((topicId, partitions) ->
+ Map<Uuid, Map<Integer, Integer>> topicPartitionsWithEpochs
Review Comment:
Could we rename the parameter to `assignment`?
```suggestion
Map<Uuid, Map<Integer, Integer>> assignment
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -380,37 +381,39 @@ private ConsumerGroupMember computeNextAssignment(
for (Uuid topicId : allTopicIds) {
Set<Integer> target = targetAssignment.partitions()
.getOrDefault(topicId, Set.of());
- Set<Integer> currentAssignedPartitions = memberAssignedPartitions
- .getOrDefault(topicId, Set.of());
+ Map<Integer, Integer> currentAssignedPartitions =
memberAssignedPartitions
+ .getOrDefault(topicId, Map.of());
// If the member is no longer subscribed to the topic, treat its
target assignment as empty.
if (!subscribedTopicIds.contains(topicId)) {
target = Set.of();
}
// New Assigned Partitions = Previous Assigned Partitions ∩ Target
- Set<Integer> assignedPartitions = new
HashSet<>(currentAssignedPartitions);
- assignedPartitions.retainAll(target);
+ Map<Integer, Integer> assignedPartitions = new
HashMap<>(currentAssignedPartitions);
+ assignedPartitions.keySet().retainAll(target);
// Partitions Pending Revocation = Previous Assigned Partitions -
New Assigned Partitions
- Set<Integer> partitionsPendingRevocation = new
HashSet<>(currentAssignedPartitions);
- partitionsPendingRevocation.removeAll(assignedPartitions);
+ Map<Integer, Integer> partitionsPendingRevocation = new
HashMap<>(currentAssignedPartitions);
+
partitionsPendingRevocation.keySet().removeAll(assignedPartitions.keySet());
// Partitions Pending Assignment = Target - New Assigned
Partitions - Unreleased Partitions
Set<Integer> partitionsPendingAssignment = new HashSet<>(target);
- partitionsPendingAssignment.removeAll(assignedPartitions);
+ partitionsPendingAssignment.removeAll(assignedPartitions.keySet());
hasUnreleasedPartitions =
partitionsPendingAssignment.removeIf(partitionId ->
currentPartitionEpoch.apply(topicId, partitionId) != -1 &&
// Don't consider a partition unreleased if it is owned by the
current member
// because it is pending revocation. This is safe to do since
only a single member
// can own a partition at a time.
- !member.partitionsPendingRevocation().getOrDefault(topicId,
Set.of()).contains(partitionId)
+ !member.partitionsPendingRevocation().getOrDefault(topicId,
Map.of()).containsKey(partitionId)
) || hasUnreleasedPartitions;
+ // Build epochs map for assigned partitions, preserve existing
epochs
Review Comment:
I'm not sure this comment is necessary any more.
```suggestion
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -291,25 +291,26 @@ private boolean ownsRevokedPartitions(
* This method is a lot faster than running the full reconciliation logic
in computeNextAssignment.
*
* @param memberEpoch The epoch of the member to use.
- * @param memberAssignedPartitions The assigned partitions of the member
to use.
+ * @param memberAssignedPartitions The assigned partitions of the member
to use and their assignment epochs.
* @return A new ConsumerGroupMember.
*/
private ConsumerGroupMember updateCurrentAssignment(
int memberEpoch,
- Map<Uuid, Set<Integer>> memberAssignedPartitions
+ Map<Uuid, Map<Integer, Integer>> memberAssignedPartitions
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
// Reuse the original map if no topics need to be removed.
- Map<Uuid, Set<Integer>> newAssignedPartitions;
- Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+ Map<Uuid, Map<Integer, Integer>> newAssignedPartitions;
+ Map<Uuid, Map<Integer, Integer>> newPartitionsPendingRevocation =
Map.of();
+
if (subscribedTopicIds.isEmpty() &&
member.partitionsPendingRevocation().isEmpty()) {
newAssignedPartitions = Map.of();
+ // Move all assigned to pending revocation
newPartitionsPendingRevocation = memberAssignedPartitions;
} else {
newAssignedPartitions = memberAssignedPartitions;
Review Comment:
Could we initialize `newPartitionsPendingRevocation` in the else branch, but
not clone `member.partitionsPendingRevocation`? Initializing to an empty map
sets up a footgun later on if the else branch was taken, where using
`newPartitionsPendingRevocation` is a bug unless a topic became unsubscribed.
I missed this last time, sorry.
```suggestion
newAssignedPartitions = memberAssignedPartitions;
newPartitionsPendingRevocation =
member.partitionsPendingRevocation();
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1043,20 +1043,20 @@ private void maybeRemovePartitionEpoch(
}
/**
- * Removes the partition epochs based on the provided assignment.
+ * Removes the partition epochs based on the provided assignment and
member epoch.
*
- * @param assignment The assignment.
- * @param expectedEpoch The expected epoch.
+ * @param assignment The assignment with epochs. The assignment epochs
are ignored.
+ * @param expectedEpoch The expected member epoch.
* package-private for testing.
*/
void removePartitionEpochs(
- Map<Uuid, Set<Integer>> assignment,
+ Map<Uuid, Map<Integer, Integer>> assignment,
int expectedEpoch
) {
- assignment.forEach((topicId, assignedPartitions) -> {
+ assignment.forEach((topicId, partitionEpochMap) -> {
Review Comment:
Thanks for fixing up the names. We're still inconsistent though.
We call it `partitionEpochMap` in `ConsumerGroup.addPartitionEpochs`,
`removePartitionEpochs` and `GroupCoordinatorRecordHelpers.toTopicPartitions`.
We call it `partitionEpochs` in
`ConsumerGroupMember.resetAssignedPartitionsEpochsToZero`, `assignmentEpoch`,
`pendingRevocationEpoch`, `Utils.assignmentFromTopicPartitions`,
`Utils.toAssignmentWithEpochs`,
`AssignmentTestUtil.mkTopicAssignmentWithEpochs` and
`CurrentAssignmentBuilderBenchmark.setupMember`.
It looks like `partitionEpochs` is the more natural name since we end up
using it more often?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -71,6 +71,35 @@ public static Map<Uuid, Set<Integer>>
mkOrderedAssignment(Map.Entry<Uuid, Set<In
return Collections.unmodifiableMap(assignment);
}
+ public static Map.Entry<Uuid, Map<Integer, Integer>>
mkTopicAssignmentWithEpochs(
+ Uuid topicId,
+ int epoch,
+ Integer... partitions
+ ) {
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partition : partitions) {
+ partitionEpochs.put(partition, epoch);
+ }
+ return new AbstractMap.SimpleEntry<>(topicId, partitionEpochs);
+ }
+
+ @SafeVarargs
+ public static Map<Uuid, Map<Integer, Integer>> mkAssignmentWithEpochs(
+ Map.Entry<Uuid, Map<Integer, Integer>>... entries
+ ) {
+ Map<Uuid, Map<Integer, Integer>> assignment = new HashMap<>();
+ for (Map.Entry<Uuid, Map<Integer, Integer>> entry : entries) {
+ assignment.merge(entry.getKey(), new HashMap<>(entry.getValue()),
(existing, newValue) -> {
+ Map<Integer, Integer> merged = new HashMap<>(existing);
+ merged.putAll(newValue);
+ return merged;
+ });
Review Comment:
Does this work?
```suggestion
assignment
.computeIfAbsent(entry.getKey(), __ -> new HashMap<>())
.putAll(entry.getValue())
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -2476,9 +2489,7 @@ member2RejoinId, new MemberAssignmentImpl(mkAssignment(
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(List.of("foo", "bar"))
.setServerAssignorName("range")
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 3, 4, 5),
- mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .setAssignedPartitions(expectedRejoinedAssignment)
Review Comment:
```suggestion
.setAssignedPartitions(mkAssignmentWithEpochs(
mkTopicAssignmentWithEpochs(fooTopicId, 0, 3, 4, 5),
mkTopicAssignmentWithEpochs(barTopicId, 11, 0, 1, 2)))
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##########
@@ -235,4 +239,43 @@ void testComputeGroupHashWithSameKeyButDifferentValue() {
);
assertNotEquals(Utils.computeGroupHash(map1),
Utils.computeGroupHash(map2));
}
+
+ @Test
+ void testAssignmentFromTopicPartitionsWithNegativeDefaultEpoch() {
+ List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
topicPartitions = List.of(
+ new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(FOO_TOPIC_ID)
+ .setPartitions(Arrays.asList(0, 1, 2))
+ );
+
+ Map<Uuid, Map<Integer, Integer>> result =
Utils.assignmentFromTopicPartitions(
+ topicPartitions,
+ LEAVE_GROUP_STATIC_MEMBER_EPOCH // -2
+ );
+
+ // Verify epoch is adjusted to 0
+ assertEquals(Map.of(
+ FOO_TOPIC_ID, Map.of(0, 0, 1, 0, 2, 0)
+ ), result);
+ }
+
+ @Test
+ void testAssignmentFromTopicPartitionsWithEpochsProvided() {
+ List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
topicPartitions = List.of(
+ new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(FOO_TOPIC_ID)
+ .setPartitions(Arrays.asList(0, 1, 2))
+ .setAssignmentEpochs(Arrays.asList(5, 6, 7))
+ );
+
+ Map<Uuid, Map<Integer, Integer>> result =
Utils.assignmentFromTopicPartitions(
+ topicPartitions,
+ LEAVE_GROUP_STATIC_MEMBER_EPOCH // -2
+ );
+
+ // Verify assignment epochs are used
+ assertEquals(Map.of(
+ FOO_TOPIC_ID, Map.of(0, 5, 1, 6, 2, 7)
+ ), result);
Review Comment:
Can we use `mkAssignment` to construct the expected result?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3561,14 +3574,18 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
result.response()
);
+ // member2: partition 3 (fooTopicId) and 2 (barTopicId) were retained
from epoch 10,
+ // partition 2 (fooTopicId) is newly assigned at epoch 11
+ Map<Uuid, Map<Integer, Integer>> member2ExpectedAssignment = Map.of(
+ fooTopicId, new HashMap<>(Map.of(2, 11, 3, 10)),
+ barTopicId, Map.of(2, 10)
+ );
assertRecordsEquals(List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 2, 3),
- mkTopicAssignment(barTopicId, 2)))
+ .setAssignedPartitions(member2ExpectedAssignment)
Review Comment:
```suggestion
.setAssignedPartitions(mkAssignmentWithEpochs(
mkTopicAssignmentWithEpochs(fooTopicId, 11, 2),
mkTopicAssignmentWithEpochs(fooTopicId, 10, 3),
mkTopicAssignmentWithEpochs(barTopicId, 2, 10)))
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -71,6 +71,35 @@ public static Map<Uuid, Set<Integer>>
mkOrderedAssignment(Map.Entry<Uuid, Set<In
return Collections.unmodifiableMap(assignment);
}
+ public static Map.Entry<Uuid, Map<Integer, Integer>>
mkTopicAssignmentWithEpochs(
+ Uuid topicId,
+ int epoch,
+ Integer... partitions
+ ) {
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partition : partitions) {
+ partitionEpochs.put(partition, epoch);
+ }
+ return new AbstractMap.SimpleEntry<>(topicId, partitionEpochs);
+ }
+
+ @SafeVarargs
+ public static Map<Uuid, Map<Integer, Integer>> mkAssignmentWithEpochs(
+ Map.Entry<Uuid, Map<Integer, Integer>>... entries
+ ) {
+ Map<Uuid, Map<Integer, Integer>> assignment = new HashMap<>();
+ for (Map.Entry<Uuid, Map<Integer, Integer>> entry : entries) {
+ assignment.merge(entry.getKey(), new HashMap<>(entry.getValue()),
(existing, newValue) -> {
+ Map<Integer, Integer> merged = new HashMap<>(existing);
+ merged.putAll(newValue);
+ return merged;
+ });
+ }
+ Map<Uuid, Map<Integer, Integer>> result = new LinkedHashMap<>();
+ assignment.forEach((k, v) -> result.put(k,
Collections.unmodifiableMap(v)));
+ return Collections.unmodifiableMap(result);
Review Comment:
The ordering of the `HashMap` and `LinkedHashMap` will be the same.
```suggestion
assignment.replaceAll((__, innerMap) ->
Collections.unmodifiableMap(innerMap));
return Collections.unmodifiableMap(assignment);
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##########
@@ -235,4 +239,43 @@ void testComputeGroupHashWithSameKeyButDifferentValue() {
);
assertNotEquals(Utils.computeGroupHash(map1),
Utils.computeGroupHash(map2));
}
+
+ @Test
+ void testAssignmentFromTopicPartitionsWithNegativeDefaultEpoch() {
+ List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
topicPartitions = List.of(
+ new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(FOO_TOPIC_ID)
+ .setPartitions(Arrays.asList(0, 1, 2))
+ );
+
+ Map<Uuid, Map<Integer, Integer>> result =
Utils.assignmentFromTopicPartitions(
+ topicPartitions,
+ LEAVE_GROUP_STATIC_MEMBER_EPOCH // -2
+ );
+
+ // Verify epoch is adjusted to 0
+ assertEquals(Map.of(
+ FOO_TOPIC_ID, Map.of(0, 0, 1, 0, 2, 0)
+ ), result);
Review Comment:
Can we use `mkAssignment` to construct the expected result?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1477,9 +1481,9 @@ public void
testUpdatingSubscriptionTriggersNewTargetAssignment() {
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(List.of("foo", "bar"))
.setServerAssignorName("range")
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
- mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .setAssignedPartitions(Map.of(
+ fooTopicId,
toAssignmentWithEpochs(mkAssignment(mkTopicAssignment(fooTopicId, 0, 1, 2, 3,
4, 5)), 10).get(fooTopicId),
+ barTopicId,
toAssignmentWithEpochs(mkAssignment(mkTopicAssignment(barTopicId, 0, 1, 2)),
11).get(barTopicId)))
Review Comment:
```suggestion
.setAssignedPartitions(mkAssignmentWithEpochs(
mkTopicAssignmentWithEpochs(fooTopicId, 10, 0, 1, 2, 3, 4,
5),
mkTopicAssignmentWithEpochs(barTopicId, 11, 0, 1, 2)))
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java:
##########
@@ -114,15 +115,19 @@ public void testStableToStableWithNewPartitions() {
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.build();
+ // Retained partitions keep their original epoch (10), new partitions
get the new epoch (11)
+ Map<Uuid, Map<Integer, Integer>> expectedAssignment = Map.of(
+ topicId1, Map.of(1, 10, 2, 10, 3, 10, 4, 11),
+ topicId2, Map.of(4, 10, 5, 10, 6, 10, 7, 11)
+ );
+
assertEquals(
new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setSubscribedTopicNames(List.of(topic1, topic2))
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(topicId1, 1, 2, 3, 4),
- mkTopicAssignment(topicId2, 4, 5, 6, 7)))
+ .setAssignedPartitions(expectedAssignment)
Review Comment:
```suggestion
.setAssignedPartitions(mkAssignmentWithEpochs(
mkTopicAssignmentWithEpochs(topicId1, 10, 1, 2, 3),
mkTopicAssignmentWithEpochs(topicId1, 11, 4),
mkTopicAssignmentWithEpochs(topicId2, 10, 4, 5, 6),
mkTopicAssignmentWithEpochs(topicId2, 11, 7)))
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14123,9 +14148,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setState(MemberState.STABLE)
.setSubscribedTopicNames(List.of(fooTopicName, barTopicName,
zarTopicName))
- .setAssignedPartitions(mkAssignment(
- mkTopicAssignment(fooTopicId, 0),
- mkTopicAssignment(zarTopicId, 0)))
+ .setAssignedPartitions(expectedAssignment)
Review Comment:
```suggestion
.setAssignedPartitions(mkAssignmentWithEpochs(
mkTopicAssignmentWithEpochs(fooTopicId, 10, 0),
mkTopicAssignmentWithEpochs(zarTopicId, 11, 0)))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]