This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b2b240b43dd MINOR: Revise Topic Resolving Logic in OffsetCommit
Request to Prevent Race Condition (#21809)
b2b240b43dd is described below
commit b2b240b43dd5f87f3d38794f8bda9131964fd9b0
Author: Lucy Liu <[email protected]>
AuthorDate: Wed Mar 18 14:32:30 2026 -0500
MINOR: Revise Topic Resolving Logic in OffsetCommit Request to Prevent Race
Condition (#21809)
This patch slightly revises the logic to resolve topicId in an offset
commit request. It avoids setting topicId twice if `topic.topicId`
already exists, avoiding race condition in between. This is a follow-up
of https://github.com/apache/kafka/pull/21692
Reviewers: David Jacot <[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 54c6395b5f1..6745cb0ec31 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -312,11 +312,12 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
// For lower API versions, the topic id may not be included in the
request.
// In this case, we resolve the topic id from metadata cache to
ensure that the topic exists.
- // If the topic doesn't exist, the currentTopicId will fallback to
ZERO_UUID.
- val currentTopicId = metadataCache.getTopicId(topic.name)
- topic.setTopicId(currentTopicId)
+ // If the topic doesn't exist, the topicId will fallback to
ZERO_UUID.
+ if (!useTopicIds) {
+ topic.setTopicId(metadataCache.getTopicId(topic.name))
+ }
- if (currentTopicId == Uuid.ZERO_UUID) {
+ if (topic.topicId == Uuid.ZERO_UUID) {
// If the topic is unknown, we add the topic and all its partitions
// to the response with UNKNOWN_TOPIC_OR_PARTITION.
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
@@ -324,7 +325,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
// Otherwise, we check all partitions to ensure that they all
exist.
val topicWithValidPartitions = new
OffsetCommitRequestData.OffsetCommitRequestTopic()
- .setTopicId(topic.topicId())
+ .setTopicId(topic.topicId)
.setName(topic.name)
topic.partitions.forEach { partition =>
@@ -332,7 +333,7 @@ class KafkaApis(val requestChannel: RequestChannel,
topicWithValidPartitions.partitions.add(partition)
} else {
responseBuilder.addPartition(
- topic.topicId(),
+ topic.topicId,
topic.name,
partition.partitionIndex,
Errors.UNKNOWN_TOPIC_OR_PARTITION