Alex Rovner created KAFKA-19107:
-----------------------------------
Summary: MirrorMaker2 repeatedly tries to sync group offsets for
partitions that no longer exist
Key: KAFKA-19107
URL: https://issues.apache.org/jira/browse/KAFKA-19107
Project: Kafka
Issue Type: Bug
Components: mirrormaker
Affects Versions: 3.9.0
Reporter: Alex Rovner
h3. Summary
MM2's Checkpoint connector is currently unaware of topics being deleted and
will repeatedly attempt to sync group offsets for them, which results in
`UnknownTopicOrPartitionException` being logged at each sync. The checkpoints
used for this sync are persisted in a MM2-internal checkpoints topic. Because
the topic is compacted, the checkpoints that reference deleted topic-partitions
are never cleaned up. This means that `UnknownTopicOrPartitionException` will
occur indefinitely on every sync until manual intervention.
Our current workaround is to change the retention policy of the checkpoints
topic to `compact,delete` or even `delete` to make sure that old checkpoints do
not remain in the topic forever. However, this is not an ideal approach because
(correct me if I'm wrong) any cleanup would only take effect when the connector
is restarted because that is when the contents of the checkpoint-topic are
loaded into the internal state of the connector.
h3. How to reproduce
# Create 2 Kafka clusters (my-cluster and my-backup) with
`{{{}auto.create.topics.enable{}}}=false`
# Create a MM2 instance and configure it to sync consumer group offsets
# Spin up a dummy synthetic client that continuously produces to some topic
(e.g. `synth-client-topic`)
# Consume from `synth-client-topic` using `kafka-console-consumer` with
`--group foo`
# Consume from the checkpoints topic to make sure that some checkpoints have
been created:
{{kafka-console-consumer.sh --bootstrap-server my-backup-kafka-bootstrap:9092
--topic source.checkpoints.internal --formatter
"org.apache.kafka.connect.mirror.formatters.CheckpointFormatter"
--from-beginning}}
# Shut down MM2, consume some more from the `synth-client-topic`. This is
probably not strictly necessary, but we want to create a scenario where the
groups in the two clusters are out-of-sync.
# Stop the dummy client. Delete the source and mirrored topics.
# Launch MM2 again. Observe that it is now logging errors like this one every
minute:
{quote}{{{}2025-04-07 12:25:34,203 ERROR
[source->backup.MirrorCheckpointConnector|task-0] Unable to sync offsets for
consumer group foo. (org.apache.kafka.connect.mirror.MirrorCheckpointTask)
[kafka-admin-client-thread |
source->backup|source->backup.MirrorCheckpointConnector-0|checkpoint-target-admin]{}}}{{{}java.util.concurrent.CompletionException:
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Failed
altering consumer group offsets for the following partitions:
[source.synth-client-topic-0]{}}}{{ at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)}}{{
at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)}}{{
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)}}{{
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)}}{{
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)}}{{
at
org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39)}}{{
at
org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122)}}{{
at
org.apache.kafka.clients.admin.internals.AdminApiFuture$SimpleAdminApiFuture.complete(AdminApiFuture.java:100)}}{{
at
java.base/java.util.Collections$SingletonMap.forEach(Collections.java:5061)}}{{
at
java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)}}{{
at
org.apache.kafka.clients.admin.internals.AdminApiFuture$SimpleAdminApiFuture.complete(AdminApiFuture.java:96)}}{{
at
org.apache.kafka.clients.admin.internals.AdminApiDriver.complete(AdminApiDriver.java:186)}}{{
at
org.apache.kafka.clients.admin.internals.AdminApiDriver.onResponse(AdminApiDriver.java:230)}}{{
at
org.apache.kafka.clients.admin.KafkaAdminClient$39.handleResponse(KafkaAdminClient.java:4843)}}{{
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1397)}}{{
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1550)}}{{
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1473)}}{{{}
at java.base/java.lang.Thread.run(Thread.java:840){}}}{{{}Caused by:
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Failed
altering consumer group offsets for the following partitions:
[source.synth-client-topic-0]{}}}{quote}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)