This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b2b240b43dd MINOR: Revise Topic Resolving Logic in OffsetCommit 
Request to Prevent Race Condition (#21809)
b2b240b43dd is described below

commit b2b240b43dd5f87f3d38794f8bda9131964fd9b0
Author: Lucy Liu <[email protected]>
AuthorDate: Wed Mar 18 14:32:30 2026 -0500

    MINOR: Revise Topic Resolving Logic in OffsetCommit Request to Prevent Race 
Condition (#21809)
    
    This patch slightly revises the logic to resolve topicId in an offset
    commit request. It avoids setting topicId twice if `topic.topicId`
    already exists, avoiding race condition in between. This is a follow-up
    of https://github.com/apache/kafka/pull/21692
    
    Reviewers: David Jacot <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 54c6395b5f1..6745cb0ec31 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -312,11 +312,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         } else {
           // For lower API versions, the topic id may not be included in the 
request.
           // In this case, we resolve the topic id from metadata cache to 
ensure that the topic exists.
-          // If the topic doesn't exist, the currentTopicId will fallback to 
ZERO_UUID.
-          val currentTopicId = metadataCache.getTopicId(topic.name)
-          topic.setTopicId(currentTopicId)
+          // If the topic doesn't exist, the topicId will fallback to 
ZERO_UUID.
+          if (!useTopicIds) {
+            topic.setTopicId(metadataCache.getTopicId(topic.name))
+          }
 
-          if (currentTopicId == Uuid.ZERO_UUID) {
+          if (topic.topicId == Uuid.ZERO_UUID) {
             // If the topic is unknown, we add the topic and all its partitions
             // to the response with UNKNOWN_TOPIC_OR_PARTITION.
             
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
@@ -324,7 +325,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           } else {
             // Otherwise, we check all partitions to ensure that they all 
exist.
             val topicWithValidPartitions = new 
OffsetCommitRequestData.OffsetCommitRequestTopic()
-              .setTopicId(topic.topicId())
+              .setTopicId(topic.topicId)
               .setName(topic.name)
 
             topic.partitions.forEach { partition =>
@@ -332,7 +333,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 topicWithValidPartitions.partitions.add(partition)
               } else {
                 responseBuilder.addPartition(
-                  topic.topicId(),
+                  topic.topicId,
                   topic.name,
                   partition.partitionIndex,
                   Errors.UNKNOWN_TOPIC_OR_PARTITION

Reply via email to