[
https://issues.apache.org/jira/browse/KAFKA-12838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350606#comment-17350606
]
Ryanne Dolan commented on KAFKA-12838:
--------------------------------------
Would it help to significantly increase the number of partitions you're writing
to?
> Kafka Broker - Request threads inefficiently blocking during produce
> --------------------------------------------------------------------
>
> Key: KAFKA-12838
> URL: https://issues.apache.org/jira/browse/KAFKA-12838
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Affects Versions: 2.7.0, 2.8.0
> Reporter: Ryan Cabral
> Priority: Major
>
> Hello, I have been using Kafka brokers for a bit and have run into a problem
> with the way a kafka broker handles produce requests. If there are multiple
> producers to the same topic and partition, any request handler threads
> handling the produce for that topic and partition become blocked until all
> requests before it are done. Request handler threads for the entire broker
> can become exhausted waiting on the same partition lock, blocking requests
> for other partitions that would not have needed the same lock.
> Once that starts happening, requests start to back up, queued requests can
> reach its maximum and network threads begin to be paused cascading the
> problem a bit more. Overall performance ends up being degraded. I'm not so
> focused on the cascade at the moment as I am the initial contention.
> Intuitively I would expect locking contention on a single partition to ONLY
> affect throughput on that partition and not the entire broker.
>
> The append call within the request handler originates here:
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638]
> Further down the stack the lock during append is created here:
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165]
> At this point the first request will hold the lock during append and future
> requests on the same partition will block, waiting for the lock, tying up an
> io thread (request handler).
> At first glance, it seems like it would make the most sense to (via config?)
> be able to funnel (produce) requests for the same partition through its own
> request queue of sorts and dispatch them such that at most one io thread is
> tied up at a time for a given partition. There are a number of reasons the
> lock could be held elsewhere too but this should at least help mitigate the
> issue a bit. I'm assuming this is easier said than done though and likely
> requires significant refactoring to properly achieve but hoping this is
> something that could end up on some sort of long term roadmap.
>
> Snippet from jstack. Almost all request handlers threads (there are 256 of
> them, up from 25 to mitigate the issue) in the jstack are blocked waiting on
> the same lock due to the number of producers we have.
>
> {noformat}
> "data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0
> tid=0x00007fb1c9f13000 nid=0x53f1 runnable [0x00007fad35796000]
> java.lang.Thread.State: RUNNABLE
> at
> org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:82)
> at
> org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:125)
> at
> org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101)
> at
> org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:134)
> at
> org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:170)
> at
> org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508)
> at
> kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500)
> at
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455)
> at
> kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
> at kafka.log.Log.$anonfun$append$2(Log.scala:1126)
> - locked <0x00000004c9a6fd60> (a java.lang.Object)
> at kafka.log.Log.append(Log.scala:2387)
> at kafka.log.Log.appendAsLeader(Log.scala:1050)
> at
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
> at
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
> at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown
> Source)
> at
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
> at java.lang.Thread.run(Thread.java:748)
> "data-plane-kafka-request-handler-253" #334 daemon prio=5 os_prio=0
> tid=0x00007fb1c9f11000 nid=0x53f0 waiting for monitor entry
> [0x00007fad35897000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at kafka.log.Log.$anonfun$append$2(Log.scala:1104)
> - waiting to lock <0x00000004c9a6fd60> (a java.lang.Object)
> at kafka.log.Log.append(Log.scala:2387)
> at kafka.log.Log.appendAsLeader(Log.scala:1050)
> at
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
> at
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
> at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown
> Source)
> at
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
> at java.lang.Thread.run(Thread.java:748){noformat}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)