This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 85be1694df7 KAFKA-20444: [7/N] Wire topic IDs through
OffsetMetadataManager (KIP-1319) (#22249)
85be1694df7 is described below
commit 85be1694df7c5b13b7657ab78e412b3480c23198
Author: David Jacot <[email protected]>
AuthorDate: Mon May 11 09:23:27 2026 +0200
KAFKA-20444: [7/N] Wire topic IDs through OffsetMetadataManager (KIP-1319)
(#22249)
This patch wires the topic ID populated by `KafkaApis` directly through
`OffsetMetadataManager`. The topic ID is now also persisted in the
offset record and echoed back on the `TxnOffsetCommit` response. The
previous metadata image lookup is removed.
Reviewers: Sean Quah <[email protected]>
---
.../coordinator/group/GroupCoordinatorShard.java | 2 -
.../coordinator/group/OffsetMetadataManager.java | 51 +++---------------
.../group/GroupCoordinatorShardTest.java | 8 ---
.../group/OffsetMetadataManagerTest.java | 60 +++++-----------------
.../TransactionalOffsetFetchBenchmark.java | 15 ------
5 files changed, 21 insertions(+), 115 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index cb1966db578..eee4cc728d1 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -1081,7 +1081,6 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
public void onLoaded(CoordinatorMetadataImage newImage) {
CoordinatorMetadataDelta emptyDelta = newImage.emptyDelta();
groupMetadataManager.onMetadataUpdate(emptyDelta, newImage);
- offsetMetadataManager.onMetadataUpdate(emptyDelta, newImage);
coordinatorMetrics.activateMetricsShard(metricsShard);
groupMetadataManager.onLoaded();
@@ -1106,7 +1105,6 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
@Override
public void onMetadataUpdate(CoordinatorMetadataDelta delta,
CoordinatorMetadataImage newImage) {
groupMetadataManager.onMetadataUpdate(delta, newImage);
- offsetMetadataManager.onMetadataUpdate(delta, newImage);
}
private static OffsetCommitKey convertLegacyOffsetCommitKey(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index ede54a05fab..e30b036c66e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -39,8 +39,6 @@ import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.internals.LogContext;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
-import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard.DeletedTopic;
@@ -86,7 +84,6 @@ public class OffsetMetadataManager {
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private GroupMetadataManager groupMetadataManager = null;
- private CoordinatorMetadataImage metadataImage = null;
private GroupCoordinatorConfig config = null;
private GroupCoordinatorMetricsShard metrics = null;
@@ -115,11 +112,6 @@ public class OffsetMetadataManager {
return this;
}
- public Builder withMetadataImage(CoordinatorMetadataImage
metadataImage) {
- this.metadataImage = metadataImage;
- return this;
- }
-
public Builder
withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
this.metrics = metrics;
return this;
@@ -128,7 +120,6 @@ public class OffsetMetadataManager {
public OffsetMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new
SnapshotRegistry(logContext);
- if (metadataImage == null) metadataImage =
CoordinatorMetadataImage.EMPTY;
if (time == null) time = Time.SYSTEM;
if (groupMetadataManager == null) {
@@ -143,7 +134,6 @@ public class OffsetMetadataManager {
snapshotRegistry,
logContext,
time,
- metadataImage,
groupMetadataManager,
config,
metrics
@@ -166,11 +156,6 @@ public class OffsetMetadataManager {
*/
private final Time time;
- /**
- * The metadata image.
- */
- private CoordinatorMetadataImage metadataImage;
-
/**
* The group metadata manager.
*/
@@ -432,7 +417,6 @@ public class OffsetMetadataManager {
SnapshotRegistry snapshotRegistry,
LogContext logContext,
Time time,
- CoordinatorMetadataImage metadataImage,
GroupMetadataManager groupMetadataManager,
GroupCoordinatorConfig config,
GroupCoordinatorMetricsShard metrics
@@ -440,7 +424,6 @@ public class OffsetMetadataManager {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(OffsetMetadataManager.class);
this.time = time;
- this.metadataImage = metadataImage;
this.groupMetadataManager = groupMetadataManager;
this.config = config;
this.metrics = metrics;
@@ -706,21 +689,11 @@ public class OffsetMetadataManager {
final long currentTimeMs = time.milliseconds();
request.topics().forEach(topic -> {
- final TxnOffsetCommitResponseTopic topicResponse = new
TxnOffsetCommitResponseTopic().setName(topic.name());
+ final TxnOffsetCommitResponseTopic topicResponse = new
TxnOffsetCommitResponseTopic()
+ .setName(topic.name())
+ .setTopicId(topic.topicId());
response.topics().add(topicResponse);
- // Resolve topicId from the metadata image.
- final Uuid resolvedTopicId = metadataImage
- .topicMetadata(topic.name())
- .map(CoordinatorMetadataImage.TopicMetadata::id)
- .orElse(Uuid.ZERO_UUID);
-
- // If the topic doesn't exist in metadata, and we need to validate
the member's assignment,
- // throw ILLEGAL_GENERATION.
- if (resolvedTopicId.equals(Uuid.ZERO_UUID) && validator !=
CommitPartitionValidator.NO_OP) {
- throw Errors.ILLEGAL_GENERATION.exception();
- }
-
topic.partitions().forEach(partition -> {
if (isMetadataInvalid(partition.committedMetadata())) {
topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
@@ -731,7 +704,7 @@ public class OffsetMetadataManager {
try {
validator.validate(
topic.name(),
- resolvedTopicId,
+ topic.topicId(),
partition.partitionIndex()
);
} catch (StaleMemberEpochException ex) {
@@ -743,8 +716,8 @@ public class OffsetMetadataManager {
throw Errors.ILLEGAL_GENERATION.exception();
}
- log.debug("[GroupId {}] Committing transactional offsets
{} for partition {}-{} from member {} with leader epoch {}.",
- request.groupId(), partition.committedOffset(),
topic.name(), partition.partitionIndex(),
+ log.debug("[GroupId {}] Committing transactional offsets
{} for partition {}-{}-{} from member {} with leader epoch {}.",
+ request.groupId(), partition.committedOffset(),
topic.topicId(), topic.name(), partition.partitionIndex(),
request.memberId(), partition.committedLeaderEpoch());
topicResponse.partitions().add(new
TxnOffsetCommitResponsePartition()
@@ -752,7 +725,7 @@ public class OffsetMetadataManager {
.setErrorCode(Errors.NONE.code()));
final OffsetAndMetadata offsetAndMetadata =
OffsetAndMetadata.fromRequest(
- Uuid.ZERO_UUID,
+ topic.topicId(),
partition,
currentTimeMs
);
@@ -1308,16 +1281,6 @@ public class OffsetMetadataManager {
}
}
- /**
- * A new metadata image is available.
- *
- * @param delta The delta image.
- * @param newImage The new metadata image.
- */
- public void onMetadataUpdate(CoordinatorMetadataDelta delta,
CoordinatorMetadataImage newImage) {
- this.metadataImage = newImage;
- }
-
/**
* @return The offset for the provided groupId and topic partition or null
* if it does not exist.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 8747c33bc4e..8fc746605d3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1273,10 +1273,6 @@ public class GroupCoordinatorShardTest {
any(), eq(image)
);
- verify(offsetMetadataManager, times(1)).onMetadataUpdate(
- any(), eq(image)
- );
-
verify(groupMetadataManager, times(1)).onLoaded();
}
@@ -1304,10 +1300,6 @@ public class GroupCoordinatorShardTest {
verify(groupMetadataManager, times(1)).onMetadataUpdate(
eq(delta), eq(image)
);
-
- verify(offsetMetadataManager, times(1)).onMetadataUpdate(
- eq(delta), eq(image)
- );
}
@Test
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index b0f3170e7e2..484a18d9393 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -167,7 +167,6 @@ public class OffsetMetadataManagerTest {
.withTime(time)
.withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry)
- .withMetadataImage(metadataImage)
.withGroupMetadataManager(groupMetadataManager)
.withGroupCoordinatorConfig(config)
.withGroupCoordinatorMetricsShard(metrics)
@@ -1641,8 +1640,9 @@ public class OffsetMetadataManagerTest {
);
}
- @Test
- public void testConsumerGroupTransactionalOffsetCommit() {
+ @ParameterizedTest
+ @MethodSource("uuids")
+ public void testConsumerGroupTransactionalOffsetCommit(Uuid topicId) {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
@@ -1658,7 +1658,7 @@ public class OffsetMetadataManagerTest {
.build()
);
- verifyTransactionalOffsetCommit(context);
+ verifyTransactionalOffsetCommit(topicId, context);
}
@ParameterizedTest
@@ -1694,6 +1694,7 @@ public class OffsetMetadataManagerTest {
.setMemberId("member")
.setTopics(List.of(
new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setTopicId(barTopicId)
.setName(barTopicName)
.setPartitions(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
@@ -1719,6 +1720,7 @@ public class OffsetMetadataManagerTest {
new TxnOffsetCommitResponseData()
.setTopics(List.of(
new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setTopicId(barTopicId)
.setName(barTopicName)
.setPartitions(List.of(
new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
@@ -1730,8 +1732,9 @@ public class OffsetMetadataManagerTest {
);
}
- @Test
- public void testStreamsGroupTransactionalOffsetCommit() {
+ @ParameterizedTest
+ @MethodSource("uuids")
+ public void testStreamsGroupTransactionalOffsetCommit(Uuid topicId) {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
@@ -1746,10 +1749,10 @@ public class OffsetMetadataManagerTest {
.build()
);
- verifyTransactionalOffsetCommit(context);
+ verifyTransactionalOffsetCommit(topicId, context);
}
- private static void
verifyTransactionalOffsetCommit(OffsetMetadataManagerTestContext context) {
+ private static void verifyTransactionalOffsetCommit(Uuid topicId,
OffsetMetadataManagerTestContext context) {
CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord>
result = context.commitTransactionalOffset(
new TxnOffsetCommitRequestData()
.setGroupId("foo")
@@ -1758,6 +1761,7 @@ public class OffsetMetadataManagerTest {
.setTopics(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
.setName("bar")
+ .setTopicId(topicId)
.setPartitions(List.of(
new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
.setPartitionIndex(0)
@@ -1773,6 +1777,7 @@ public class OffsetMetadataManagerTest {
.setTopics(List.of(
new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
.setName("bar")
+ .setTopicId(topicId)
.setPartitions(List.of(
new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
.setPartitionIndex(0)
@@ -1793,7 +1798,7 @@ public class OffsetMetadataManagerTest {
"metadata",
context.time.milliseconds(),
OptionalLong.empty(),
- Uuid.ZERO_UUID
+ topicId
)
)),
result.records()
@@ -1894,43 +1899,6 @@ public class OffsetMetadataManagerTest {
verifyTransactionalOffsetCommitWithStaleMemberEpoch(context, version);
}
- @Test
- public void
testConsumerGroupTransactionalOffsetCommitWithUnresolvedTopicId() {
- OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- Uuid topicId = Uuid.randomUuid();
-
- ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
- "foo",
- true
- );
-
- group.updateMember(new ConsumerGroupMember.Builder("member")
- .setMemberEpoch(10)
- .setPreviousMemberEpoch(10)
- .setAssignedPartitions(mkAssignmentWithEpochs(
- mkTopicAssignmentWithEpochs(topicId, 5, 0)))
- .build()
- );
-
- // When topicId couldn't be resolved and fall back to ZERO_UUID, since
NO_OP validator is not used,
- // ILLEGAL_GENERATION is thrown.
- assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
- new TxnOffsetCommitRequestData()
- .setGroupId("foo")
- .setMemberId("member")
- .setGenerationIdOrMemberEpoch(7)
- .setTopics(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
- .setName("bar")
- .setPartitions(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(100L)
- ))
- ))
- ));
- }
-
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
public void
testStreamsGroupTransactionalOffsetCommitWithStaleMemberEpoch(short version) {
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
index d83bef10820..d2612c8572e 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/TransactionalOffsetFetchBenchmark.java
@@ -16,12 +16,9 @@
*/
package org.apache.kafka.jmh.coordinator;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.OffsetFetchRequestData;
-import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.internals.LogContext;
-import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupMetadataManager;
@@ -29,9 +26,6 @@ import
org.apache.kafka.coordinator.group.OffsetMetadataManager;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
-import org.apache.kafka.image.MetadataDelta;
-import org.apache.kafka.image.MetadataImage;
-import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.openjdk.jmh.annotations.Benchmark;
@@ -83,14 +77,6 @@ public class TransactionalOffsetFetchBenchmark {
@Setup(Level.Trial)
public void setup() {
LogContext logContext = new LogContext();
- MetadataDelta delta = new MetadataDelta.Builder()
- .setImage(MetadataImage.EMPTY)
- .build();
- delta.replay(new TopicRecord()
- .setTopicId(Uuid.randomUuid())
- .setName(TOPIC_NAME));
- MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
-
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
@@ -103,7 +89,6 @@ public class TransactionalOffsetFetchBenchmark {
.withTime(TIME)
.withGroupMetadataManager(groupMetadataManager)
.withGroupCoordinatorConfig(mock(GroupCoordinatorConfig.class))
- .withMetadataImage(new KRaftCoordinatorMetadataImage(image))
.withGroupCoordinatorMetricsShard(mock(GroupCoordinatorMetricsShard.class))
.build();