[ 
https://issues.apache.org/jira/browse/KAFKA-17244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870479#comment-17870479
 ] 

Jianbin Chen edited comment on KAFKA-17244 at 8/2/24 12:00 PM:
---------------------------------------------------------------

Hi [~kirktrue] ,
 # jdk21, Rocky linux 8.4, intel
 # This issue must occur after you invoke {{kafkaProducer#send}} in the virtual 
thread. You can add the {{-Djdk.tracePinnedThreads=full}} parameter in your 
test program to observe this phenomenon

Thanks for your attention to this issue

I think that this is closely related to the following code. If a synchronized 
lock is triggered in a virtual thread, the current virtual thread will be 
pinned.
{code:java}
                synchronized (dq) {
                    // After taking the lock, validate that the partition 
hasn't changed and retry.
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, 
nowMs, cluster))
                        continue;                    RecordAppendResult 
appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, 
value, headers, callbacks, buffer, nowMs);
                    // Set buffer to null, so that deallocate doesn't return it 
back to free pool, since it's used in the batch.
                    if (appendResult.newBatchCreated)
                        buffer = null;
                    // If queue has incomplete batches we disable switch (see 
comments in updatePartitionInfo).
                    boolean enableSwitch = allBatchesFull(dq);
                    
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 
appendResult.appendedBytes, cluster, enableSwitch);
                    return appendResult;
                } {code}
Should we add a ReentrantLock to dq to replace synchronized?

 


was (Author: jianbin):
# jdk21, Rocky linux 8.4, intel
 # This issue must occur after you invoke {{kafkaProducer#send}} in the virtual 
thread. You can add the {{-Djdk.tracePinnedThreads=full}} parameter in your 
test program to observe this phenomenon

Thanks for your attention to this issue

I think that this is closely related to the following code. If a synchronized 
lock is triggered in a virtual thread, the current virtual thread will be 
pinned.
{code:java}
                synchronized (dq) {
                    // After taking the lock, validate that the partition 
hasn't changed and retry.
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, 
nowMs, cluster))
                        continue;                    RecordAppendResult 
appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, 
value, headers, callbacks, buffer, nowMs);
                    // Set buffer to null, so that deallocate doesn't return it 
back to free pool, since it's used in the batch.
                    if (appendResult.newBatchCreated)
                        buffer = null;
                    // If queue has incomplete batches we disable switch (see 
comments in updatePartitionInfo).
                    boolean enableSwitch = allBatchesFull(dq);
                    
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 
appendResult.appendedBytes, cluster, enableSwitch);
                    return appendResult;
                } {code}
Should we add a ReentrantLock to dq to replace synchronized?

 

> java.base/java.lang.VirtualThread$VThreadContinuation.onPinned
> --------------------------------------------------------------
>
>                 Key: KAFKA-17244
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17244
>             Project: Kafka
>          Issue Type: Wish
>          Components: clients, producer 
>    Affects Versions: 3.7.1
>            Reporter: Jianbin Chen
>            Priority: Major
>
> {code:java}
> Thread[#121,ForkJoinPool-1-worker-2,5,CarrierThreads]
> java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:183)
> java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
> java.base/java.lang.VirtualThread.tryYield(VirtualThread.java:756)
> java.base/java.lang.Thread.yield(Thread.java:443)
> java.base/java.util.concurrent.ConcurrentHashMap.initTable(ConcurrentHashMap.java:2295)
> java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1017)
> java.base/java.util.concurrent.ConcurrentHashMap.putIfAbsent(ConcurrentHashMap.java:1541)
> org.apache.kafka.common.record.CompressionRatioEstimator.getAndCreateEstimationIfAbsent(CompressionRatioEstimator.java:96)
> org.apache.kafka.common.record.CompressionRatioEstimator.estimation(CompressionRatioEstimator.java:59)
> org.apache.kafka.clients.producer.internals.ProducerBatch.<init>(ProducerBatch.java:95)
> org.apache.kafka.clients.producer.internals.ProducerBatch.<init>(ProducerBatch.java:83)
> org.apache.kafka.clients.producer.internals.RecordAccumulator.appendNewBatch(RecordAccumulator.java:399)
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:350)
>  <== monitors:1
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1025)
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947) 
> {code}
> Because there is synchronized in the {{RecordAccumulator.append}} method, 
> which causes the virtual thread to be {{{}onPinned{}}}, if this is considered 
> an optimization item, please assign it to me, and I will try to optimize the 
> problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to