[
https://issues.apache.org/jira/browse/KAFKA-17244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870479#comment-17870479
]
Jianbin Chen edited comment on KAFKA-17244 at 8/2/24 12:00 PM:
---------------------------------------------------------------
Hi [~kirktrue] ,
# jdk21, Rocky linux 8.4, intel
# This issue must occur after you invoke {{kafkaProducer#send}} in the virtual
thread. You can add the {{-Djdk.tracePinnedThreads=full}} parameter in your
test program to observe this phenomenon
Thanks for your attention to this issue
I think that this is closely related to the following code. If a synchronized
lock is triggered in a virtual thread, the current virtual thread will be
pinned.
{code:java}
synchronized (dq) {
// After taking the lock, validate that the partition
hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq,
nowMs, cluster))
continue; RecordAppendResult
appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key,
value, headers, callbacks, buffer, nowMs);
// Set buffer to null, so that deallocate doesn't return it
back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
// If queue has incomplete batches we disable switch (see
comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo,
appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
} {code}
Should we add a ReentrantLock to dq to replace synchronized?
was (Author: jianbin):
# jdk21, Rocky linux 8.4, intel
# This issue must occur after you invoke {{kafkaProducer#send}} in the virtual
thread. You can add the {{-Djdk.tracePinnedThreads=full}} parameter in your
test program to observe this phenomenon
Thanks for your attention to this issue
I think that this is closely related to the following code. If a synchronized
lock is triggered in a virtual thread, the current virtual thread will be
pinned.
{code:java}
synchronized (dq) {
// After taking the lock, validate that the partition
hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq,
nowMs, cluster))
continue; RecordAppendResult
appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key,
value, headers, callbacks, buffer, nowMs);
// Set buffer to null, so that deallocate doesn't return it
back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
// If queue has incomplete batches we disable switch (see
comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo,
appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
} {code}
Should we add a ReentrantLock to dq to replace synchronized?
> java.base/java.lang.VirtualThread$VThreadContinuation.onPinned
> --------------------------------------------------------------
>
> Key: KAFKA-17244
> URL: https://issues.apache.org/jira/browse/KAFKA-17244
> Project: Kafka
> Issue Type: Wish
> Components: clients, producer
> Affects Versions: 3.7.1
> Reporter: Jianbin Chen
> Priority: Major
>
> {code:java}
> Thread[#121,ForkJoinPool-1-worker-2,5,CarrierThreads]
> java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:183)
> java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
> java.base/java.lang.VirtualThread.tryYield(VirtualThread.java:756)
> java.base/java.lang.Thread.yield(Thread.java:443)
> java.base/java.util.concurrent.ConcurrentHashMap.initTable(ConcurrentHashMap.java:2295)
> java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1017)
> java.base/java.util.concurrent.ConcurrentHashMap.putIfAbsent(ConcurrentHashMap.java:1541)
> org.apache.kafka.common.record.CompressionRatioEstimator.getAndCreateEstimationIfAbsent(CompressionRatioEstimator.java:96)
> org.apache.kafka.common.record.CompressionRatioEstimator.estimation(CompressionRatioEstimator.java:59)
> org.apache.kafka.clients.producer.internals.ProducerBatch.<init>(ProducerBatch.java:95)
> org.apache.kafka.clients.producer.internals.ProducerBatch.<init>(ProducerBatch.java:83)
> org.apache.kafka.clients.producer.internals.RecordAccumulator.appendNewBatch(RecordAccumulator.java:399)
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:350)
> <== monitors:1
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1025)
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
> {code}
> Because there is synchronized in the {{RecordAccumulator.append}} method,
> which causes the virtual thread to be {{{}onPinned{}}}, if this is considered
> an optimization item, please assign it to me, and I will try to optimize the
> problem.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)