[
https://issues.apache.org/jira/browse/KAFKA-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Srivignesh updated KAFKA-16398:
-------------------------------
Description:
Based on custom predicate, our application is filtering messages during
mirroring.
When the HasHeader:test method of the predicate returns true (when it has to
drop messages from mirroring), it encounters below exceptions.
However when it returns false (the messages are forwarded for mirroring), it
works fine without OOM.
Note: This issue doesn't occur in version 2.8.0.
JVM heap size increased till 15G, but still OOM hits.
Exception stacktraces:
{code:java}
line java.lang.OutOfMemoryError: Java heap space
line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
line at org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
line at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
line at
org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
line at
org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
line at
org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
line at
org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
line at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
line at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
line at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
line at
org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
line at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
line at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
line at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
line at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
line at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
line at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
line at
org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown
Source)
line at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
line at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
line at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
line at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
line at java.base/java.lang.Thread.run(Thread.java:840) {code}
{code:java}
line java.lang.OutOfMemoryError: Java heap space line at
java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
{code}
was:
Based on custom predicate, our application is filtering messages during
mirroring.
When the HasHeader:test method of the predicate returns true (when it has to
drop messages from mirroring), it encounters below exceptions.
However when it returns false (the messages are forwarded for mirroring), it
works fine without OOM.
Note: This issue doesn't occur in version 2.8.0.
Exception stacktraces:
{code:java}
line java.lang.OutOfMemoryError: Java heap space
line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
line at org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
line at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
line at
org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
line at
org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
line at
org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
line at
org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
line at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
line at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
line at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
line at
org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
line at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
line at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
line at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
line at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
line at
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
line at
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
line at
org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown
Source)
line at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
line at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
line at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
line at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
line at java.base/java.lang.Thread.run(Thread.java:840) {code}
{code:java}
line java.lang.OutOfMemoryError: Java heap space line at
java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
{code}
> mirror-maker2 running into OOM while filtering (dropping) high number of
> messages
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-16398
> URL: https://issues.apache.org/jira/browse/KAFKA-16398
> Project: Kafka
> Issue Type: Bug
> Components: connect, mirrormaker
> Affects Versions: 3.6.1
> Reporter: Srivignesh
> Priority: Critical
>
> Based on custom predicate, our application is filtering messages during
> mirroring.
> When the HasHeader:test method of the predicate returns true (when it has to
> drop messages from mirroring), it encounters below exceptions.
> However when it returns false (the messages are forwarded for mirroring), it
> works fine without OOM.
> Note: This issue doesn't occur in version 2.8.0.
> JVM heap size increased till 15G, but still OOM hits.
> Exception stacktraces:
> {code:java}
> line java.lang.OutOfMemoryError: Java heap space
> line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
> line at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
> line at
> org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
> line at
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
> line at
> org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
> line at
> org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
> line at
> org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
> line at
> org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
> line at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
> line at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
> line at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
> line at
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
> line at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
> line at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
> line at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
> line at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
> line at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
> line at
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
> line at
> org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown
> Source)
> line at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
> line at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> line at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> line at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> line at java.base/java.lang.Thread.run(Thread.java:840) {code}
> {code:java}
> line java.lang.OutOfMemoryError: Java heap space line at
> java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)