[
https://issues.apache.org/jira/browse/KAFKA-19274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Jacot updated KAFKA-19274:
--------------------------------
Description:
Group Coordinator Shards are not unloaded when __consumer_offsets topic is
deleted. The unloading is scheduled but it is ignored because the epoch is
equal to the current epoch:
{noformat}
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of
metadata for __consumer_offsets-0 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of
metadata for __consumer_offsets-1 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current epoch
is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current epoch
is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime){noformat}
The issue seems to be in this code:
{code:java}
// Handle the case where the topic was deleted
Option(delta.topicsDelta()).foreach { topicsDelta =>
if (topicsDelta.topicWasDeleted(topicName)) {
topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach { entry =>
if (entry.getValue.leader == brokerId) {
resignation(entry.getKey, Some(entry.getValue.leaderEpoch))
}
}
}
}
// Handle the case where the replica was reassigned, made a leader or made a
follower
getTopicDelta(topicName, image, delta).foreach { topicDelta =>
val changes = topicDelta.localChanges(brokerId)
changes.deletes.forEach { topicPartition =>
resignation(topicPartition.partition, None)
}
changes.electedLeaders.forEach { (topicPartition, partitionInfo) =>
election(topicPartition.partition, partitionInfo.partition.leaderEpoch)
}
changes.followers.forEach { (topicPartition, partitionInfo) =>
resignation(topicPartition.partition,
Some(partitionInfo.partition.leaderEpoch))
}
} {code}
We use `None` when a partition is deleted and the actual leader epoch when the
topic is deleted. The issue is that the leader epoch is not incremented when
the topic is deleted so the unloading logic does not accept the resignation. We
should use `None` in both cases.
was:
Group Coordinator Shards are not unloaded when __consumer_offsets topic is
deleted. The unloading is scheduled but it is ignored because the epoch is
equal to the current epoch:
{noformat}
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of
metadata for __consumer_offsets-0 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of
metadata for __consumer_offsets-1 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current epoch
is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current epoch
is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime){noformat}
> Group Coordinator Shards are not unloaded when __consumer_offsets topic is
> deleted
> ----------------------------------------------------------------------------------
>
> Key: KAFKA-19274
> URL: https://issues.apache.org/jira/browse/KAFKA-19274
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 4.0.0
> Reporter: David Jacot
> Assignee: David Jacot
> Priority: Major
>
> Group Coordinator Shards are not unloaded when __consumer_offsets topic is
> deleted. The unloading is scheduled but it is ignored because the epoch is
> equal to the current epoch:
> {noformat}
> [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading
> of metadata for __consumer_offsets-0 with epoch OptionalInt[0]
> (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
> [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading
> of metadata for __consumer_offsets-1 with epoch OptionalInt[0]
> (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
> [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
> metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current epoch
> is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
> [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
> metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current epoch
> is 0.
> (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime){noformat}
> The issue seems to be in this code:
> {code:java}
> // Handle the case where the topic was deleted
> Option(delta.topicsDelta()).foreach { topicsDelta =>
> if (topicsDelta.topicWasDeleted(topicName)) {
> topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach { entry
> =>
> if (entry.getValue.leader == brokerId) {
> resignation(entry.getKey, Some(entry.getValue.leaderEpoch))
> }
> }
> }
> }
> // Handle the case where the replica was reassigned, made a leader or made a
> follower
> getTopicDelta(topicName, image, delta).foreach { topicDelta =>
> val changes = topicDelta.localChanges(brokerId)
> changes.deletes.forEach { topicPartition =>
> resignation(topicPartition.partition, None)
> }
> changes.electedLeaders.forEach { (topicPartition, partitionInfo) =>
> election(topicPartition.partition, partitionInfo.partition.leaderEpoch)
> }
> changes.followers.forEach { (topicPartition, partitionInfo) =>
> resignation(topicPartition.partition,
> Some(partitionInfo.partition.leaderEpoch))
> }
> } {code}
> We use `None` when a partition is deleted and the actual leader epoch when
> the topic is deleted. The issue is that the leader epoch is not incremented
> when the topic is deleted so the unloading logic does not accept the
> resignation. We should use `None` in both cases.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)