George Yang created KAFKA-19046:
-----------------------------------
Summary: Change delete cleanup policy to compact cleanup policy
Key: KAFKA-19046
URL: https://issues.apache.org/jira/browse/KAFKA-19046
Project: Kafka
Issue Type: Bug
Components: mirrormaker
Affects Versions: 3.7.1
Environment: Kafka version: 3.7
mirrormaker2 version: 3.7.1
zk version: 3.6.8
Reporter: George Yang
The internal topics of MirrorMaker 2 (MM2) sometimes report the following error:
`
Uncaught exception in herder work thread, exiting:
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)
org.apache.kafka.common.config.ConfigException: Topic 'mm2-offsets.cb.internal'
supplied via the 'offset.storage.topic' property is required to have
'cleanup.policy=compact' to guarantee consistency and durability of source
connector offsets, but found the topic currently has 'cleanup.policy=delete'.
Continuing would likely result in eventually losing source connector offsets
and problems restarting this Connect cluster in the future. Change the
'offset.storage.topic' property in the Connect worker configurations to use a
topic with 'cleanup.policy=compact'.
at
org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233)
at
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
[2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping
(org.apache.kafka.connect.mirror.MirrorMaker:208)
`
This results in the MM2 pods in the cluster entering a CrashLoopBackOff state
repeatedly. When changing the configuration via kafka-configs.sh, the process
runs fine. However, as we know, the default Kafka broker configuration for
log.cleanup.policy is set to delete, while the default cleanup policy for MM2
is set to compact. It appears that the policy for offset.storage.topic must be
compact, and similarly for status.storage and config.storage.
I want to configure the cleanup policy for these three topics to always be
compact. I attempted to configure them in connect-mirror-maker.properties as
shown below, but all attempts failed:
`
offset.storage.topic.properties.cleanup.policy=compact
status.storage.topic.properties.cleanup.policy=compact
config.storage.topic.properties.cleanup.policy=compact
`
or
`
offset.storage.topic.cleanup.policy=compact
status.storage.topic.cleanup.policy=compact
config.storage.topic.cleanup.policy=compact
`
The logs show that the properties are unknown and report a failure in topic
creation:
`
Uncaught exception in herder work thread, exiting:
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1]
org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s)
'mm2-offsets.cb.internal': Unknown topic config name:
topic.properties.cleanup.policy
at
org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474)
at
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345)
at
org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363)
at
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57)
at
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43)
at
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233)
at
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException:
Unknown topic config name: topic.properties.cleanup.policy
`
How can I configure the cleanup policy correctly? I reviewed the ticket
KAFKA-17101, which describes a similar issue as mentioned by @Kaushik
Srinivas.and @Anh Tuan Nguyen.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)