This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 85be1694df7 KAFKA-20444: [7/N] Wire topic IDs through 
OffsetMetadataManager (KIP-1319) (#22249)
85be1694df7 is described below

commit 85be1694df7c5b13b7657ab78e412b3480c23198
Author: David Jacot <[email protected]>
AuthorDate: Mon May 11 09:23:27 2026 +0200

    KAFKA-20444: [7/N] Wire topic IDs through OffsetMetadataManager (KIP-1319) 
(#22249)
    
    This patch wires the topic ID populated by `KafkaApis` directly through
    `OffsetMetadataManager`. The topic ID is now also persisted in the
    offset record and echoed back on the `TxnOffsetCommit` response. The
    previous metadata image lookup is removed.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../coordinator/group/GroupCoordinatorShard.java   |  2 -
 .../coordinator/group/OffsetMetadataManager.java   | 51 +++---------------
 .../group/GroupCoordinatorShardTest.java           |  8 ---
 .../group/OffsetMetadataManagerTest.java           | 60 +++++-----------------
 .../TransactionalOffsetFetchBenchmark.java         | 15 ------
 5 files changed, 21 insertions(+), 115 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index cb1966db578..eee4cc728d1 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -1081,7 +1081,6 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     public void onLoaded(CoordinatorMetadataImage newImage) {
         CoordinatorMetadataDelta emptyDelta = newImage.emptyDelta();
         groupMetadataManager.onMetadataUpdate(emptyDelta, newImage);
-        offsetMetadataManager.onMetadataUpdate(emptyDelta, newImage);
         coordinatorMetrics.activateMetricsShard(metricsShard);
 
         groupMetadataManager.onLoaded();
@@ -1106,7 +1105,6 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     @Override
     public void onMetadataUpdate(CoordinatorMetadataDelta delta, 
CoordinatorMetadataImage newImage) {
         groupMetadataManager.onMetadataUpdate(delta, newImage);
-        offsetMetadataManager.onMetadataUpdate(delta, newImage);
     }
 
     private static OffsetCommitKey convertLegacyOffsetCommitKey(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index ede54a05fab..e30b036c66e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -39,8 +39,6 @@ import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.internals.LogContext;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
 import org.apache.kafka.coordinator.group.GroupCoordinatorShard.DeletedTopic;
@@ -86,7 +84,6 @@ public class OffsetMetadataManager {
         private SnapshotRegistry snapshotRegistry = null;
         private Time time = null;
         private GroupMetadataManager groupMetadataManager = null;
-        private CoordinatorMetadataImage metadataImage = null;
         private GroupCoordinatorConfig config = null;
         private GroupCoordinatorMetricsShard metrics = null;
 
@@ -115,11 +112,6 @@ public class OffsetMetadataManager {
             return this;
         }
 
-        public Builder withMetadataImage(CoordinatorMetadataImage 
metadataImage) {
-            this.metadataImage = metadataImage;
-            return this;
-        }
-
         public Builder 
withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
             this.metrics = metrics;
             return this;
@@ -128,7 +120,6 @@ public class OffsetMetadataManager {
         public OffsetMetadataManager build() {
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
-            if (metadataImage == null) metadataImage = 
CoordinatorMetadataImage.EMPTY;
             if (time == null) time = Time.SYSTEM;
 
             if (groupMetadataManager == null) {
@@ -143,7 +134,6 @@ public class OffsetMetadataManager {
                 snapshotRegistry,
                 logContext,
                 time,
-                metadataImage,
                 groupMetadataManager,
                 config,
                 metrics
@@ -166,11 +156,6 @@ public class OffsetMetadataManager {
      */
     private final Time time;
 
-    /**
-     * The metadata image.
-     */
-    private CoordinatorMetadataImage metadataImage;
-
     /**
      * The group metadata manager.
      */
@@ -432,7 +417,6 @@ public class OffsetMetadataManager {
         SnapshotRegistry snapshotRegistry,
         LogContext logContext,
         Time time,
-        CoordinatorMetadataImage metadataImage,
         GroupMetadataManager groupMetadataManager,
         GroupCoordinatorConfig config,
         GroupCoordinatorMetricsShard metrics
@@ -440,7 +424,6 @@ public class OffsetMetadataManager {
         this.snapshotRegistry = snapshotRegistry;
         this.log = logContext.logger(OffsetMetadataManager.class);
         this.time = time;
-        this.metadataImage = metadataImage;
         this.groupMetadataManager = groupMetadataManager;
         this.config = config;
         this.metrics = metrics;
@@ -706,21 +689,11 @@ public class OffsetMetadataManager {
         final long currentTimeMs = time.milliseconds();
 
         request.topics().forEach(topic -> {
-            final TxnOffsetCommitResponseTopic topicResponse = new 
TxnOffsetCommitResponseTopic().setName(topic.name());
+            final TxnOffsetCommitResponseTopic topicResponse = new 
TxnOffsetCommitResponseTopic()
+                .setName(topic.name())
+                .setTopicId(topic.topicId());
             response.topics().add(topicResponse);
 
-            // Resolve topicId from the metadata image.
-            final Uuid resolvedTopicId = metadataImage
-                .topicMetadata(topic.name())
-                .map(CoordinatorMetadataImage.TopicMetadata::id)
-                .orElse(Uuid.ZERO_UUID);
-
-            // If the topic doesn't exist in metadata, and we need to validate 
the member's assignment,
-            // throw ILLEGAL_GENERATION.
-            if (resolvedTopicId.equals(Uuid.ZERO_UUID) && validator != 
CommitPartitionValidator.NO_OP) {
-                throw Errors.ILLEGAL_GENERATION.exception();
-            }
-
             topic.partitions().forEach(partition -> {
                 if (isMetadataInvalid(partition.committedMetadata())) {
                     topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
@@ -731,7 +704,7 @@ public class OffsetMetadataManager {
                     try {
                         validator.validate(
                             topic.name(),
-                            resolvedTopicId,
+                            topic.topicId(),
                             partition.partitionIndex()
                         );
                     } catch (StaleMemberEpochException ex) {
@@ -743,8 +716,8 @@ public class OffsetMetadataManager {
                         throw Errors.ILLEGAL_GENERATION.exception();
                     }
 
-                    log.debug("[GroupId {}] Committing transactional offsets 
{} for partition {}-{} from member {} with leader epoch {}.",
-                        request.groupId(), partition.committedOffset(), 
topic.name(), partition.partitionIndex(),
+                    log.debug("[GroupId {}] Committing transactional offsets 
{} for partition {}-{}-{} from member {} with leader epoch {}.",
+                        request.groupId(), partition.committedOffset(), 
topic.topicId(), topic.name(), partition.partitionIndex(),
                         request.memberId(), partition.committedLeaderEpoch());
 
                     topicResponse.partitions().add(new 
TxnOffsetCommitResponsePartition()
@@ -752,7 +725,7 @@ public class OffsetMetadataManager {
                         .setErrorCode(Errors.NONE.code()));
 
                     final OffsetAndMetadata offsetAndMetadata = 
OffsetAndMetadata.fromRequest(
-                        Uuid.ZERO_UUID,
+                        topic.topicId(),
                         partition,
                         currentTimeMs
                     );
@@ -1308,16 +1281,6 @@ public class OffsetMetadataManager {
         }
     }
 
-    /**
-     * A new metadata image is available.
-     *
-     * @param delta    The delta image.
-     * @param newImage The new metadata image.
-     */
-    public void onMetadataUpdate(CoordinatorMetadataDelta delta, 
CoordinatorMetadataImage newImage) {
-        this.metadataImage = newImage;
-    }
-
     /**
      * @return The offset for the provided groupId and topic partition or null
      * if it does not exist.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 8747c33bc4e..8fc746605d3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1273,10 +1273,6 @@ public class GroupCoordinatorShardTest {
             any(), eq(image)
         );
 
-        verify(offsetMetadataManager, times(1)).onMetadataUpdate(
-            any(), eq(image)
-        );
-
         verify(groupMetadataManager, times(1)).onLoaded();
     }
 
@@ -1304,10 +1300,6 @@ public class GroupCoordinatorShardTest {
         verify(groupMetadataManager, times(1)).onMetadataUpdate(
             eq(delta), eq(image)
         );
-
-        verify(offsetMetadataManager, times(1)).onMetadataUpdate(
-            eq(delta), eq(image)
-        );
     }
 
     @Test
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index b0f3170e7e2..484a18d9393 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -167,7 +167,6 @@ public class OffsetMetadataManagerTest {
                     .withTime(time)
                     .withLogContext(logContext)
                     .withSnapshotRegistry(snapshotRegistry)
-                    .withMetadataImage(metadataImage)
                     .withGroupMetadataManager(groupMetadataManager)
                     .withGroupCoordinatorConfig(config)
                     .withGroupCoordinatorMetricsShard(metrics)
@@ -1641,8 +1640,9 @@ public class OffsetMetadataManagerTest {
         );
     }
 
-    @Test
-    public void testConsumerGroupTransactionalOffsetCommit() {
+    @ParameterizedTest
+    @MethodSource("uuids")
+    public void testConsumerGroupTransactionalOffsetCommit(Uuid topicId) {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
@@ -1658,7 +1658,7 @@ public class OffsetMetadataManagerTest {
             .build()
         );
 
-        verifyTransactionalOffsetCommit(context);
+        verifyTransactionalOffsetCommit(topicId, context);
     }
 
     @ParameterizedTest
@@ -1694,6 +1694,7 @@ public class OffsetMetadataManagerTest {
             .setMemberId("member")
             .setTopics(List.of(
                 new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+                    .setTopicId(barTopicId)
                     .setName(barTopicName)
                     .setPartitions(List.of(
                         new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -1719,6 +1720,7 @@ public class OffsetMetadataManagerTest {
             new TxnOffsetCommitResponseData()
                 .setTopics(List.of(
                     new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                        .setTopicId(barTopicId)
                         .setName(barTopicName)
                         .setPartitions(List.of(
                             new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
@@ -1730,8 +1732,9 @@ public class OffsetMetadataManagerTest {
         );
     }
 
-    @Test
-    public void testStreamsGroupTransactionalOffsetCommit() {
+    @ParameterizedTest
+    @MethodSource("uuids")
+    public void testStreamsGroupTransactionalOffsetCommit(Uuid topicId) {
         OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
 
         // Create an empty group.
@@ -1746,10 +1749,10 @@ public class OffsetMetadataManagerTest {
             .build()
         );
 
-        verifyTransactionalOffsetCommit(context);
+        verifyTransactionalOffsetCommit(topicId, context);
     }
 
-    private static void 
verifyTransactionalOffsetCommit(OffsetMetadataManagerTestContext context) {
+    private static void verifyTransactionalOffsetCommit(Uuid topicId, 
OffsetMetadataManagerTestContext context) {
         CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> 
result = context.commitTransactionalOffset(
             new TxnOffsetCommitRequestData()
                 .setGroupId("foo")
@@ -1758,6 +1761,7 @@ public class OffsetMetadataManagerTest {
                 .setTopics(List.of(
                     new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
                         .setName("bar")
+                        .setTopicId(topicId)
                         .setPartitions(List.of(
                             new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
                                 .setPartitionIndex(0)
@@ -1773,6 +1777,7 @@ public class OffsetMetadataManagerTest {
                 .setTopics(List.of(
                     new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
                         .setName("bar")
+                        .setTopicId(topicId)
                         .setPartitions(List.of(
                             new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
                                 .setPartitionIndex(0)
@@ -1793,7 +1798,7 @@ public class OffsetMetadataManagerTest {
                     "metadata",
                     context.time.milliseconds(),
                     OptionalLong.empty(),
-                    Uuid.ZERO_UUID
+                    topicId
                 )
             )),
             result.records()
@@ -1894,43 +1899,6 @@ public class OffsetMetadataManagerTest {
         verifyTransactionalOffsetCommitWithStaleMemberEpoch(context, version);
     }
 
-    @Test
-    public void 
testConsumerGroupTransactionalOffsetCommitWithUnresolvedTopicId() {
-        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
-        Uuid topicId = Uuid.randomUuid();
-
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
-            "foo",
-            true
-        );
-
-        group.updateMember(new ConsumerGroupMember.Builder("member")
-            .setMemberEpoch(10)
-            .setPreviousMemberEpoch(10)
-            .setAssignedPartitions(mkAssignmentWithEpochs(
-                mkTopicAssignmentWithEpochs(topicId, 5, 0)))
-            .build()
-        );
-
-        // When topicId couldn't be resolved and fall back to ZERO_UUID, since 
NO_OP validator is not used,
-        // ILLEGAL_GENERATION is thrown.
-        assertThrows(IllegalGenerationException.class, () -> 
context.commitTransactionalOffset(
-            new TxnOffsetCommitRequestData()
-                .setGroupId("foo")
-                .setMemberId("member")
-                .setGenerationIdOrMemberEpoch(7)
-                .setTopics(List.of(
-                    new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
-                        .setName("bar")
-                        .setPartitions(List.of(
-                            new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
-                                .setPartitionIndex(0)
-                                .setCommittedOffset(100L)
-                        ))
-                ))
-        ));
-    }
-
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
     public void 
testStreamsGroupTransactionalOffsetCommitWithStaleMemberEpoch(short version) {
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
index d83bef10820..d2612c8572e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
@@ -16,12 +16,9 @@
  */
 package org.apache.kafka.jmh.coordinator;
 
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
-import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.internals.LogContext;
-import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.coordinator.group.GroupMetadataManager;
@@ -29,9 +26,6 @@ import 
org.apache.kafka.coordinator.group.OffsetMetadataManager;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
 import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
-import org.apache.kafka.image.MetadataDelta;
-import org.apache.kafka.image.MetadataImage;
-import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
 import org.openjdk.jmh.annotations.Benchmark;
@@ -83,14 +77,6 @@ public class TransactionalOffsetFetchBenchmark {
     @Setup(Level.Trial)
     public void setup() {
         LogContext logContext = new LogContext();
-        MetadataDelta delta = new MetadataDelta.Builder()
-            .setImage(MetadataImage.EMPTY)
-            .build();
-        delta.replay(new TopicRecord()
-            .setTopicId(Uuid.randomUuid())
-            .setName(TOPIC_NAME));
-        MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
-
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
 
         GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
@@ -103,7 +89,6 @@ public class TransactionalOffsetFetchBenchmark {
             .withTime(TIME)
             .withGroupMetadataManager(groupMetadataManager)
             .withGroupCoordinatorConfig(mock(GroupCoordinatorConfig.class))
-            .withMetadataImage(new KRaftCoordinatorMetadataImage(image))
             
.withGroupCoordinatorMetricsShard(mock(GroupCoordinatorMetricsShard.class))
             .build();
 

Reply via email to