Srivignesh created KAFKA-16398:
----------------------------------
Summary: mirror-maker2 running into OOM while filtering high
number of messages
Key: KAFKA-16398
URL: https://issues.apache.org/jira/browse/KAFKA-16398
Project: Kafka
Issue Type: Bug
Components: connect, mirrormaker
Affects Versions: 3.6.1
Reporter: Srivignesh
Based on custom predicate, our application is filtering messages during
mirroring.
When the HasHeader:test method of the predicate returns false (when it has to
drop messages from mirroring), it encounters below exceptions.
However when it returns true (the messages are forwarded for mirroring), it
works fine without OOM.
Note: This issue doesn't occur in version 2.8.0.
Exception stacktraces:
{code:java}
line java.lang.OutOfMemoryError: Java heap space
line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
line at org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
line at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
line at
org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
line at
org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
line at
org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
line at
org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
line at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
line at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
line at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
line at
org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
line at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
line at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
line at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
line at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
line at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
line at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
line at
org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown
Source)
line at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
line at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
line at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
line at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
line at java.base/java.lang.Thread.run(Thread.java:840) {code}
{code:java}
line java.lang.OutOfMemoryError: Java heap space line at
java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)