Aljoscha Pörtner created KAFKA-13611:
----------------------------------------
Summary: Failed reconfiguration of tasks can cause missing offset
replications in MirrorCheckpointConnector
Key: KAFKA-13611
URL: https://issues.apache.org/jira/browse/KAFKA-13611
Project: Kafka
Issue Type: Improvement
Components: mirrormaker
Affects Versions: 3.0.0, 2.8.1, 2.7.2, 2.6.3, 3.1.0, 2.6.2, 2.7.1, 2.8.0,
2.6.1, 2.7.0, 2.5.1, 2.6.0, 2.4.1, 2.5.0, 2.4.0
Reporter: Aljoscha Pörtner
Because the _knownConsumerGroups_ are stored within a variable an not queried
every time _refreshConsumerGroups_ gets executed, errors within the task
reconfiguration aren't recognized and the reconfiguration will not be retried
until a new consumer group is added. This can lead to missing offset updates in
the target cluster because the consumer group is not picked up by a task until
a completely new consumer group is added and the task reconfiguration is
successful.
{code:java}
private void refreshConsumerGroups() throws
InterruptedException, ExecutionException { List<String> consumerGroups =
findConsumerGroups(); Set<String> newConsumerGroups = new HashSet<>();
newConsumerGroups.addAll(consumerGroups);
newConsumerGroups.removeAll(knownConsumerGroups); Set<String>
deadConsumerGroups = new HashSet<>();
deadConsumerGroups.addAll(knownConsumerGroups);
deadConsumerGroups.removeAll(consumerGroups); if
(!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {
log.info("Found {} consumer groups for {}. {} are new. {} were removed.
Previously had {}.", consumerGroups.size(), sourceAndTarget,
newConsumerGroups.size(), deadConsumerGroups.size(),
knownConsumerGroups.size()); log.debug("Found new consumer groups:
{}", newConsumerGroups); knownConsumerGroups = consumerGroups;
context.requestTaskReconfiguration(); } } {code}
[Code|https://github.com/apache/kafka/blob/ca37f14076adbaa302a558a750be197c202f1038/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L124]
For an example how the problem can be triggered take a look at the following
[issue|https://github.com/strimzi/strimzi-kafka-operator/issues/3688].
--
This message was sent by Atlassian Jira
(v8.20.1#820001)