[ 
https://issues.apache.org/jira/browse/KAFKA-13611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Pörtner updated KAFKA-13611:
-------------------------------------
    Description: 
Because the _knownConsumerGroups_ are stored within a variable an not queried 
every time _refreshConsumerGroups_ gets executed, errors within the task 
reconfiguration aren't recognized and the reconfiguration will not be retried 
until a new consumer group is added. This can lead to missing offset updates in 
the target cluster because the consumer group is not picked up by a task until 
a completely new consumer group is added and the task reconfiguration is 
successful.

 
{code:java}
    private void refreshConsumerGroups()
            throws InterruptedException, ExecutionException {
        List<String> consumerGroups = findConsumerGroups();
        Set<String> newConsumerGroups = new HashSet<>();
        newConsumerGroups.addAll(consumerGroups);
        newConsumerGroups.removeAll(knownConsumerGroups);
        Set<String> deadConsumerGroups = new HashSet<>();
        deadConsumerGroups.addAll(knownConsumerGroups);
        deadConsumerGroups.removeAll(consumerGroups);
        if (!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {
            log.info("Found {} consumer groups for {}. {} are new. {} were 
removed. Previously had {}.",
                    consumerGroups.size(), sourceAndTarget, 
newConsumerGroups.size(), deadConsumerGroups.size(),
                    knownConsumerGroups.size());
            log.debug("Found new consumer groups: {}", newConsumerGroups);
            knownConsumerGroups = consumerGroups;
            context.requestTaskReconfiguration();
        }
    }{code}
[Code|https://github.com/apache/kafka/blob/ca37f14076adbaa302a558a750be197c202f1038/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L124]

For an example how the problem can be triggered take a look at the following 
[issue|https://github.com/strimzi/strimzi-kafka-operator/issues/3688].

  was:
Because the _knownConsumerGroups_ are stored within a variable an not queried 
every time _refreshConsumerGroups_ gets executed, errors within the task 
reconfiguration aren't recognized and the reconfiguration will not be retried 
until a new consumer group is added. This can lead to missing offset updates in 
the target cluster because the consumer group is not picked up by a task until 
a completely new consumer group is added and the task reconfiguration is 
successful.

 
{code:java}
    private void refreshConsumerGroups()            throws 
InterruptedException, ExecutionException {        List<String> consumerGroups = 
findConsumerGroups();        Set<String> newConsumerGroups = new HashSet<>();   
     newConsumerGroups.addAll(consumerGroups);        
newConsumerGroups.removeAll(knownConsumerGroups);        Set<String> 
deadConsumerGroups = new HashSet<>();        
deadConsumerGroups.addAll(knownConsumerGroups);        
deadConsumerGroups.removeAll(consumerGroups);        if 
(!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {            
log.info("Found {} consumer groups for {}. {} are new. {} were removed. 
Previously had {}.",                    consumerGroups.size(), sourceAndTarget, 
newConsumerGroups.size(), deadConsumerGroups.size(),                    
knownConsumerGroups.size());            log.debug("Found new consumer groups: 
{}", newConsumerGroups);            knownConsumerGroups = consumerGroups;       
     context.requestTaskReconfiguration();        }    } {code}
[Code|https://github.com/apache/kafka/blob/ca37f14076adbaa302a558a750be197c202f1038/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L124]

For an example how the problem can be triggered take a look at the following 
[issue|https://github.com/strimzi/strimzi-kafka-operator/issues/3688].


> Failed reconfiguration of tasks can cause missing offset replications in 
> MirrorCheckpointConnector
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13611
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13611
>             Project: Kafka
>          Issue Type: Improvement
>          Components: mirrormaker
>    Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 
> 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0
>            Reporter: Aljoscha Pörtner
>            Priority: Major
>
> Because the _knownConsumerGroups_ are stored within a variable an not queried 
> every time _refreshConsumerGroups_ gets executed, errors within the task 
> reconfiguration aren't recognized and the reconfiguration will not be retried 
> until a new consumer group is added. This can lead to missing offset updates 
> in the target cluster because the consumer group is not picked up by a task 
> until a completely new consumer group is added and the task reconfiguration 
> is successful.
>  
> {code:java}
>     private void refreshConsumerGroups()
>             throws InterruptedException, ExecutionException {
>         List<String> consumerGroups = findConsumerGroups();
>         Set<String> newConsumerGroups = new HashSet<>();
>         newConsumerGroups.addAll(consumerGroups);
>         newConsumerGroups.removeAll(knownConsumerGroups);
>         Set<String> deadConsumerGroups = new HashSet<>();
>         deadConsumerGroups.addAll(knownConsumerGroups);
>         deadConsumerGroups.removeAll(consumerGroups);
>         if (!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {
>             log.info("Found {} consumer groups for {}. {} are new. {} were 
> removed. Previously had {}.",
>                     consumerGroups.size(), sourceAndTarget, 
> newConsumerGroups.size(), deadConsumerGroups.size(),
>                     knownConsumerGroups.size());
>             log.debug("Found new consumer groups: {}", newConsumerGroups);
>             knownConsumerGroups = consumerGroups;
>             context.requestTaskReconfiguration();
>         }
>     }{code}
> [Code|https://github.com/apache/kafka/blob/ca37f14076adbaa302a558a750be197c202f1038/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L124]
> For an example how the problem can be triggered take a look at the following 
> [issue|https://github.com/strimzi/strimzi-kafka-operator/issues/3688].



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to