chia7712 commented on code in PR #16828:
URL: https://github.com/apache/kafka/pull/16828#discussion_r1711588078
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -677,6 +705,73 @@ private void maybeRemovePartitionEpoch(
}
}
+ /**
+ * Removes the partition epochs based on the provided assignment.
+ *
+ * @param assignment The assignment.
+ * @param expectedEpoch The expected epoch.
+ * @throws IllegalStateException if the epoch does not match the expected
one.
+ * package-private for testing.
+ */
+ void removePartitionEpochs(
+ Map<Uuid, Set<Integer>> assignment,
+ int expectedEpoch
+ ) {
+ assignment.forEach((topicId, assignedPartitions) -> {
+ currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
+ if (partitionsOrNull != null) {
+ assignedPartitions.forEach(partitionId -> {
+ Integer prevValue =
partitionsOrNull.remove(partitionId);
+ if (prevValue != expectedEpoch) {
+ throw new IllegalStateException(
+ String.format("Cannot remove the epoch %d from
%s-%s because the partition is " +
+ "still owned at a different epoch %d",
expectedEpoch, topicId, partitionId, prevValue));
+ }
+ });
+ if (partitionsOrNull.isEmpty()) {
+ return null;
+ } else {
+ return partitionsOrNull;
+ }
+ } else {
+ throw new IllegalStateException(
+ String.format("Cannot remove the epoch %d from %s
because it does not have any epoch",
+ expectedEpoch, topicId));
+ }
+ });
+ });
+ }
+
+ /**
+ * Adds the partitions epoch based on the provided assignment.
+ *
+ * @param assignment The assignment.
+ * @param epoch The new epoch.
+ * @throws IllegalStateException if the partition already has an epoch
assigned.
+ * package-private for testing.
+ */
+ void addPartitionEpochs(
+ Map<Uuid, Set<Integer>> assignment,
+ int epoch
+ ) {
+ assignment.forEach((topicId, assignedPartitions) -> {
+ currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
+ if (partitionsOrNull == null) {
+ partitionsOrNull = new TimelineHashMap<>(snapshotRegistry,
assignedPartitions.size());
+ }
+ for (Integer partitionId : assignedPartitions) {
+ Integer prevValue = partitionsOrNull.put(partitionId,
epoch);
Review Comment:
> Yes, that's right. But it's a soft update which gets upadted when
CoordinatorRuntime either does rollback or commit. So I don't think it should
concern us.
thanks for explanation. I overlook the `SnapshotRegistry` :(
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]