[
https://issues.apache.org/jira/browse/KAFKA-15894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chia-Ping Tsai resolved KAFKA-15894.
------------------------------------
Fix Version/s: 4.3.0
Resolution: Fixed
> MessageConversionsTimeMs and TemporaryMemoryBytes may not be recorded
> correctly.
> --------------------------------------------------------------------------------
>
> Key: KAFKA-15894
> URL: https://issues.apache.org/jira/browse/KAFKA-15894
> Project: Kafka
> Issue Type: Bug
> Components: core, metrics
> Reporter: Minoru Tomioka
> Assignee: Minoru Tomioka
> Priority: Major
> Fix For: 4.3.0
>
>
> {{KafkaApis.updateRecordConversionStats}} may be called multiple times for a
> request, and {{request.messageConversionsTimeNanos}} and
> {{request.temporaryMemoryBytes}} are overwritten in this method.
> [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L3777-L3779]
> {code:java}
> private def updateRecordConversionStats(request: RequestChannel.Request,
> tp: TopicPartition,
> conversionStats:
> RecordValidationStats): Unit = {
> ...
> request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos
> }
> request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
> }{code}
>
> [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L706-L708]
> {code:java}
> def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
> processingStats.forKeyValue { (tp, info) =>
> updateRecordConversionStats(request, tp, info)
> }
> }{code}
>
> So, in this case, MessageConversionsTimeMs and TemporaryMemoryBytes are not
> recorded correctly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)