[
https://issues.apache.org/jira/browse/KAFKA-13611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Pörtner updated KAFKA-13611:
-------------------------------------
Description:
Because the _knownConsumerGroups_ are stored within a variable an not queried
every time _refreshConsumerGroups_ gets executed, errors within the task
reconfiguration aren't recognized and the reconfiguration will not be retried
until a new consumer group is added. This can lead to missing offset updates in
the target cluster because the consumer group is not picked up by a task until
a completely new consumer group is added and the task reconfiguration is
successful.
{code:java}
private void refreshConsumerGroups()
throws InterruptedException, ExecutionException {
List<String> consumerGroups = findConsumerGroups();
Set<String> newConsumerGroups = new HashSet<>();
newConsumerGroups.addAll(consumerGroups);
newConsumerGroups.removeAll(knownConsumerGroups);
Set<String> deadConsumerGroups = new HashSet<>();
deadConsumerGroups.addAll(knownConsumerGroups);
deadConsumerGroups.removeAll(consumerGroups);
if (!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {
log.info("Found {} consumer groups for {}. {} are new. {} were
removed. Previously had {}.",
consumerGroups.size(), sourceAndTarget,
newConsumerGroups.size(), deadConsumerGroups.size(),
knownConsumerGroups.size());
log.debug("Found new consumer groups: {}", newConsumerGroups);
knownConsumerGroups = consumerGroups;
context.requestTaskReconfiguration();
}
}{code}
[Code|https://github.com/apache/kafka/blob/ca37f14076adbaa302a558a750be197c202f1038/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L124]
For an example how the problem can be triggered take a look at the following
[issue|https://github.com/strimzi/strimzi-kafka-operator/issues/3688].
was:
Because the _knownConsumerGroups_ are stored within a variable an not queried
every time _refreshConsumerGroups_ gets executed, errors within the task
reconfiguration aren't recognized and the reconfiguration will not be retried
until a new consumer group is added. This can lead to missing offset updates in
the target cluster because the consumer group is not picked up by a task until
a completely new consumer group is added and the task reconfiguration is
successful.
{code:java}
private void refreshConsumerGroups() throws
InterruptedException, ExecutionException { List<String> consumerGroups =
findConsumerGroups(); Set<String> newConsumerGroups = new HashSet<>();
newConsumerGroups.addAll(consumerGroups);
newConsumerGroups.removeAll(knownConsumerGroups); Set<String>
deadConsumerGroups = new HashSet<>();
deadConsumerGroups.addAll(knownConsumerGroups);
deadConsumerGroups.removeAll(consumerGroups); if
(!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {
log.info("Found {} consumer groups for {}. {} are new. {} were removed.
Previously had {}.", consumerGroups.size(), sourceAndTarget,
newConsumerGroups.size(), deadConsumerGroups.size(),
knownConsumerGroups.size()); log.debug("Found new consumer groups:
{}", newConsumerGroups); knownConsumerGroups = consumerGroups;
context.requestTaskReconfiguration(); } } {code}
[Code|https://github.com/apache/kafka/blob/ca37f14076adbaa302a558a750be197c202f1038/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L124]
For an example how the problem can be triggered take a look at the following
[issue|https://github.com/strimzi/strimzi-kafka-operator/issues/3688].
> Failed reconfiguration of tasks can cause missing offset replications in
> MirrorCheckpointConnector
> --------------------------------------------------------------------------------------------------
>
> Key: KAFKA-13611
> URL: https://issues.apache.org/jira/browse/KAFKA-13611
> Project: Kafka
> Issue Type: Improvement
> Components: mirrormaker
> Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0,
> 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0
> Reporter: Aljoscha Pörtner
> Priority: Major
>
> Because the _knownConsumerGroups_ are stored within a variable an not queried
> every time _refreshConsumerGroups_ gets executed, errors within the task
> reconfiguration aren't recognized and the reconfiguration will not be retried
> until a new consumer group is added. This can lead to missing offset updates
> in the target cluster because the consumer group is not picked up by a task
> until a completely new consumer group is added and the task reconfiguration
> is successful.
>
> {code:java}
> private void refreshConsumerGroups()
> throws InterruptedException, ExecutionException {
> List<String> consumerGroups = findConsumerGroups();
> Set<String> newConsumerGroups = new HashSet<>();
> newConsumerGroups.addAll(consumerGroups);
> newConsumerGroups.removeAll(knownConsumerGroups);
> Set<String> deadConsumerGroups = new HashSet<>();
> deadConsumerGroups.addAll(knownConsumerGroups);
> deadConsumerGroups.removeAll(consumerGroups);
> if (!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {
> log.info("Found {} consumer groups for {}. {} are new. {} were
> removed. Previously had {}.",
> consumerGroups.size(), sourceAndTarget,
> newConsumerGroups.size(), deadConsumerGroups.size(),
> knownConsumerGroups.size());
> log.debug("Found new consumer groups: {}", newConsumerGroups);
> knownConsumerGroups = consumerGroups;
> context.requestTaskReconfiguration();
> }
> }{code}
> [Code|https://github.com/apache/kafka/blob/ca37f14076adbaa302a558a750be197c202f1038/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L124]
> For an example how the problem can be triggered take a look at the following
> [issue|https://github.com/strimzi/strimzi-kafka-operator/issues/3688].
--
This message was sent by Atlassian Jira
(v8.20.1#820001)