[ 
https://issues.apache.org/jira/browse/KAFKA-16398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Srivignesh updated KAFKA-16398:
-------------------------------
    Description: 
Based on custom predicate, our application is filtering messages during 
mirroring.

When the HasHeader:test method of the predicate returns true (when it has to 
drop messages from mirroring), it encounters below exceptions. 

However when it returns false (the messages are forwarded for mirroring), it 
works fine without OOM. 

Note: This issue doesn't occur in version 2.8.0.

Exception stacktraces:
{code:java}
line java.lang.OutOfMemoryError: Java heap space
line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
line     at org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
line     at 
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
line     at 
org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
line     at 
org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
line     at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
line     at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
line     at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
line     at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
line     at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
line     at 
org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
line     at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
line     at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
line     at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
line     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
line     at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
line     at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
line     at 
org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown
 Source)
line     at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
line     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
line     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
line     at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
line     at java.base/java.lang.Thread.run(Thread.java:840) {code}
{code:java}
line java.lang.OutOfMemoryError: Java heap space line     at 
java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
{code}

  was:
Based on custom predicate, our application is filtering messages during 
mirroring. 

When the HasHeader:test method of the predicate returns false (when it has to 
drop messages from mirroring), it encounters below exceptions. 

However when it returns true (the messages are forwarded for mirroring), it 
works fine without OOM. 

Note: This issue doesn't occur in version 2.8.0.

Exception stacktraces:
{code:java}
line java.lang.OutOfMemoryError: Java heap space
line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
line     at org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
line     at 
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
line     at 
org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
line     at 
org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
line     at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
line     at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
line     at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
line     at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
line     at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
line     at 
org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
line     at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
line     at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
line     at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
line     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
line     at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
line     at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
line     at 
org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown
 Source)
line     at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
line     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
line     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
line     at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
line     at java.base/java.lang.Thread.run(Thread.java:840) {code}
{code:java}
line java.lang.OutOfMemoryError: Java heap space line     at 
java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
{code}


> mirror-maker2 running into OOM while filtering high number of messages
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-16398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16398
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect, mirrormaker
>    Affects Versions: 3.6.1
>            Reporter: Srivignesh
>            Priority: Critical
>
> Based on custom predicate, our application is filtering messages during 
> mirroring.
> When the HasHeader:test method of the predicate returns true (when it has to 
> drop messages from mirroring), it encounters below exceptions. 
> However when it returns false (the messages are forwarded for mirroring), it 
> works fine without OOM. 
> Note: This issue doesn't occur in version 2.8.0.
> Exception stacktraces:
> {code:java}
> line java.lang.OutOfMemoryError: Java heap space
> line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:289)
> line     at org.apache.kafka.common.utils.Utils.toArray(Utils.java:252)
> line     at 
> org.apache.kafka.common.utils.Utils.toNullableArray(Utils.java:270)
> line     at 
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
> line     at 
> org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
> line     at 
> org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
> line     at 
> org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
> line     at 
> org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
> line     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262)
> line     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
> line     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
> line     at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:153)
> line     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
> line     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
> line     at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
> line     at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
> line     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
> line     at 
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
> line     at 
> org.apache.kafka.connect.runtime.isolation.Plugins$$Lambda$841/0x00007f55cc4c3d78.run(Unknown
>  Source)
> line     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
> line     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> line     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> line     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> line     at java.base/java.lang.Thread.run(Thread.java:840) {code}
> {code:java}
> line java.lang.OutOfMemoryError: Java heap space line     at 
> java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to