This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 83976543fef KAFKA-20444: [10/N] Wire topic IDs through 
TransactionManager (KIP-1319) (#22443)
83976543fef is described below

commit 83976543fef1fe942bf021c8eec787c75dd58bdd
Author: David Jacot <[email protected]>
AuthorDate: Tue Jun 2 16:33:29 2026 +0200

    KAFKA-20444: [10/N] Wire topic IDs through TransactionManager (KIP-1319) 
(#22443)
    
    This patch lets `TxnOffsetCommit` requests negotiate v6+ (KIP-1319) when
    the producer's metadata cache knows the topic ids for every topic in the
    offsets map.
    
    Key changes:
    - `TransactionManager` gains a `Metadata` reference (constructor
      parameter wired from `KafkaProducer`). It is used to look up topic
      ids when building a `TxnOffsetCommit` request.
    - `txnOffsetCommitHandler` makes a single pass over the offsets map:
      it populates `pendingTxnOffsetCommits`, builds each request topic
      (via `computeIfAbsent` over a name -> topic map, with the topic id
      resolved from `metadata.topicIds()`), and accumulates a snapshot
      `Map<Uuid, String> topicNamesByIds` for the response handler. When
      every topic has a non-zero id, the builder is chosen via
      `forTopicIdsOrNames(...)` so v6 can be negotiated; otherwise it
      falls back to `forTopicNames(...)` and caps at v5.
    - The response handler resolves the topic name from the snapshot
      (not the live metadata cache) when a v6+ response topic omits the
      name on the wire. If the id is unknown to the snapshot, the
      handler logs a warning and skips that topic so the retry loop is
      unaffected.
    
    Test changes:
    - `prepareTxnOffsetCommitResponse` in `TransactionManagerTest` now
      asserts that the inspected request is at version < 6 and that
      every request topic carries a non-empty name. This strengthens the
      existing v5-path coverage.
    - Three new tests cover the v6 path. They seed the metadata cache
      with a known topic id via `RequestTestUtils.metadataUpdateWithIds`
      and then assert the request's negotiated version, topic ids, and
      error handling:
    `testTxnOffsetCommitNegotiatesV6WhenAllTopicIdsAreAvailable`,
      `testTxnOffsetCommitDowngradesToV5WhenAnyTopicIdIsMissing`, and
      `testTxnOffsetCommitRetriesOnUnknownTopicIdAtV6`.
    
    `KafkaProducer.sendOffsetsToTransaction` will be updated in a follow-up
    to refresh the metadata cache before delegating, so the v6 negotiation
    actually fires for topics that the producer has not seen yet.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      |  27 ++---
 .../producer/internals/TransactionManager.java     |  84 ++++++++++---
 .../clients/producer/internals/SenderTest.java     |  39 ++++---
 .../producer/internals/TransactionManagerTest.java | 130 ++++++++++++++++++++-
 4 files changed, 231 insertions(+), 49 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 63303f8e089..cd495956269 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -427,6 +427,19 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
             int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
 
             this.apiVersions = apiVersions;
+            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
+            if (metadata != null) {
+                this.metadata = metadata;
+            } else {
+                this.metadata = new ProducerMetadata(retryBackoffMs,
+                        retryBackoffMaxMs,
+                        config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
+                        
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
+                        logContext,
+                        clusterResourceListeners,
+                        Time.SYSTEM);
+                this.metadata.bootstrap(addresses);
+            }
             this.transactionManager = configureTransactionState(config, 
logContext);
             // There is no need to do work required for adaptive partitioning, 
if we use a custom partitioner.
             boolean enableAdaptivePartitioning = partitionerPlugin.get() == 
null &&
@@ -454,19 +467,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
                     transactionManager,
                     new BufferPool(this.totalMemorySize, batchSize, metrics, 
time, PRODUCER_METRIC_GROUP_NAME));
 
-            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
-            if (metadata != null) {
-                this.metadata = metadata;
-            } else {
-                this.metadata = new ProducerMetadata(retryBackoffMs,
-                        retryBackoffMaxMs,
-                        config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
-                        
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
-                        logContext,
-                        clusterResourceListeners,
-                        Time.SYSTEM);
-                this.metadata.bootstrap(addresses);
-            }
             this.errors = this.metrics.sensor("errors");
             this.sender = newSender(logContext, kafkaClient, this.metadata);
             String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
@@ -622,6 +622,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 transactionTimeoutMs,
                 retryBackoffMs,
                 apiVersions,
+                metadata,
                 enable2PC
             );
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 2b10ceac6f3..266bdeb0270 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.clients.consumer.CommitFailedException;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -49,6 +51,7 @@ import 
org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import 
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -100,6 +103,7 @@ public class TransactionManager {
     private final String transactionalId;
     private final int transactionTimeoutMs;
     private final ApiVersions apiVersions;
+    private final Metadata metadata;
 
     private final TxnPartitionMap txnPartitionMap;
 
@@ -231,6 +235,7 @@ public class TransactionManager {
                               final int transactionTimeoutMs,
                               final long retryBackoffMs,
                               final ApiVersions apiVersions,
+                              final Metadata metadata,
                               final boolean enable2PC) {
         this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
         this.transactionalId = transactionalId;
@@ -248,6 +253,7 @@ public class TransactionManager {
         this.retryBackoffMs = retryBackoffMs;
         this.txnPartitionMap = new TxnPartitionMap(logContext);
         this.apiVersions = apiVersions;
+        this.metadata = metadata;
         this.enable2PC = enable2PC;
     }
 
@@ -1243,14 +1249,43 @@ public class TransactionManager {
     private TxnOffsetCommitHandler 
txnOffsetCommitHandler(TransactionalRequestResult result,
                                                           Map<TopicPartition, 
OffsetAndMetadata> offsets,
                                                           
ConsumerGroupMetadata groupMetadata) {
-        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
offsets.entrySet()) {
-            OffsetAndMetadata offsetAndMetadata = entry.getValue();
-            CommittedOffset committedOffset = new 
CommittedOffset(offsetAndMetadata.offset(),
-                    offsetAndMetadata.metadata(), 
offsetAndMetadata.leaderEpoch());
-            pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
+        // Resolve topic ids from the metadata cache at request build time.
+        // KafkaProducer.sendOffsetsToTransaction has already ensured the cache
+        // is fresh for these topics, so this is a non-blocking lookup.
+        var topicIds = metadata.topicIds();
+        var requestTopicsByName = new HashMap<String, 
TxnOffsetCommitRequestTopic>();
+        var topicNamesByIds = new HashMap<Uuid, String>();
+        var topics = new ArrayList<TxnOffsetCommitRequestTopic>();
+        var allHaveTopicIds = true;
+        for (var entry : offsets.entrySet()) {
+            var tp = entry.getKey();
+            var offsetAndMetadata = entry.getValue();
+            pendingTxnOffsetCommits.put(
+                tp,
+                new CommittedOffset(
+                    offsetAndMetadata.offset(),
+                    offsetAndMetadata.metadata(),
+                    offsetAndMetadata.leaderEpoch()
+                )
+            );
+            var topicId = topicIds.getOrDefault(tp.topic(), Uuid.ZERO_UUID);
+            allHaveTopicIds &= !topicId.equals(Uuid.ZERO_UUID);
+            var topic = requestTopicsByName.computeIfAbsent(tp.topic(), name 
-> {
+                var t = new 
TxnOffsetCommitRequestTopic().setName(name).setTopicId(topicId);
+                topics.add(t);
+                if (!topicId.equals(Uuid.ZERO_UUID)) {
+                    topicNamesByIds.put(topicId, name);
+                }
+                return t;
+            });
+            topic.partitions().add(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+                .setPartitionIndex(tp.partition())
+                .setCommittedOffset(offsetAndMetadata.offset())
+                
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+                .setCommittedMetadata(offsetAndMetadata.metadata()));
         }
 
-        final TxnOffsetCommitRequestData data = new 
TxnOffsetCommitRequestData()
+        var data = new TxnOffsetCommitRequestData()
             .setTransactionalId(transactionalId)
             .setGroupId(groupMetadata.groupId())
             .setProducerId(producerIdAndEpoch.producerId)
@@ -1258,14 +1293,15 @@ public class TransactionManager {
             .setMemberId(groupMetadata.memberId())
             .setGenerationIdOrMemberEpoch(groupMetadata.generationId())
             .setGroupInstanceId(groupMetadata.groupInstanceId().orElse(null))
-            
.setTopics(TxnOffsetCommitRequest.getTopics(pendingTxnOffsetCommits));
-        final TxnOffsetCommitRequest.Builder builder =
-            TxnOffsetCommitRequest.Builder.forTopicNames(data, 
isTransactionV2Enabled());
+            .setTopics(topics);
+        var builder = allHaveTopicIds
+            ? TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data, 
isTransactionV2Enabled(), true)
+            : TxnOffsetCommitRequest.Builder.forTopicNames(data, 
isTransactionV2Enabled());
         if (result == null) {
             // In this case, transaction V2 is in use.
-            return new TxnOffsetCommitHandler(builder);
+            return new TxnOffsetCommitHandler(builder, topicNamesByIds);
         }
-        return new TxnOffsetCommitHandler(result, builder);
+        return new TxnOffsetCommitHandler(result, builder, topicNamesByIds);
     }
 
     private void throwIfPendingState(TransactionOperation operation) {
@@ -1877,16 +1913,25 @@ public class TransactionManager {
 
     private class TxnOffsetCommitHandler extends TxnRequestHandler {
         private final TxnOffsetCommitRequest.Builder builder;
+        // Snapshot of the topic-id -> name map captured when the request was
+        // built. v6+ responses omit the topic name on the wire, so we resolve
+        // it via this snapshot rather than the live metadata cache (which may
+        // have changed by the time the response arrives).
+        private final Map<Uuid, String> topicNamesByIds;
 
         private TxnOffsetCommitHandler(TransactionalRequestResult result,
-                                       TxnOffsetCommitRequest.Builder builder) 
{
+                                       TxnOffsetCommitRequest.Builder builder,
+                                       Map<Uuid, String> topicNamesByIds) {
             super(result);
             this.builder = builder;
+            this.topicNamesByIds = topicNamesByIds;
         }
 
-        private TxnOffsetCommitHandler(TxnOffsetCommitRequest.Builder builder) 
{
+        private TxnOffsetCommitHandler(TxnOffsetCommitRequest.Builder builder,
+                                       Map<Uuid, String> topicNamesByIds) {
             super("TxnOffsetCommitHandler");
             this.builder = builder;
+            this.topicNamesByIds = topicNamesByIds;
         }
 
         @Override
@@ -1909,6 +1954,7 @@ public class TransactionManager {
             return builder.data.groupId();
         }
 
+        @SuppressWarnings({"checkstyle:NPathComplexity"})
         @Override
         public void handleResponse(AbstractResponse response) {
             TxnOffsetCommitResponse txnOffsetCommitResponse = 
(TxnOffsetCommitResponse) response;
@@ -1918,8 +1964,18 @@ public class TransactionManager {
                 builder.data.groupId(), 
txnOffsetCommitResponse.data().topics());
 
             for (TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic 
responseTopic : txnOffsetCommitResponse.data().topics()) {
+                // v6+ responses omit the topic name on the wire; fall back to
+                // the snapshot taken at request build time.
+                String topicName = responseTopic.name() != null && 
!responseTopic.name().isEmpty()
+                    ? responseTopic.name()
+                    : topicNamesByIds.get(responseTopic.topicId());
+                if (topicName == null) {
+                    log.warn("Received TxnOffsetCommit response for consumer 
group {} with an unknown topic id {}",
+                        builder.data.groupId(), responseTopic.topicId());
+                    continue;
+                }
                 for 
(TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition responsePartition 
: responseTopic.partitions()) {
-                    TopicPartition topicPartition = new 
TopicPartition(responseTopic.name(), responsePartition.partitionIndex());
+                    TopicPartition topicPartition = new 
TopicPartition(topicName, responsePartition.partitionIndex());
                     Errors error = 
Errors.forCode(responsePartition.errorCode());
                     if (error == Errors.NONE) {
                         pendingTxnOffsetCommits.remove(topicPartition);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 2a4908c1e46..a91adc502ef 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -514,7 +514,7 @@ public class SenderTest {
 
             ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
             apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-            TransactionManager txnManager = new TransactionManager(logContext, 
"testUnresolvedSeq", 60000, 100, apiVersions, false);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"testUnresolvedSeq", 60000, 100, apiVersions, metadata, false);
 
             setupWithTransactionState(txnManager);
             doInitTransactions(txnManager, producerIdAndEpoch);
@@ -642,7 +642,7 @@ public class SenderTest {
         // Initialize transaction manager. InitProducerId will be queued up 
until metadata response
         // is processed and FindCoordinator can be sent to `leastLoadedNode`.
         TransactionManager transactionManager = new TransactionManager(new 
LogContext(), "testInitProducerIdWithPendingMetadataRequest",
-                60000, 100L, new ApiVersions(), false);
+                60000, 100L, new ApiVersions(), metadata, false);
         setupWithTransactionState(transactionManager, false, null, false);
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(producerId, (short) 0);
         transactionManager.initializeTransactions(false);
@@ -694,7 +694,7 @@ public class SenderTest {
         client = new MockClient(time, metadata);
 
         TransactionManager transactionManager = new TransactionManager(new 
LogContext(), "testNodeNotReady",
-                60000, 100L, new ApiVersions(), false);
+                60000, 100L, new ApiVersions(), metadata, false);
         setupWithTransactionState(transactionManager, false, null, true);
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(producerId, (short) 0);
         transactionManager.initializeTransactions(false);
@@ -1536,7 +1536,7 @@ public class SenderTest {
     public void testUnresolvedSequencesAreNotFatal() throws Exception {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testUnresolvedSeq", 60000, 100, apiVersions, false);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testUnresolvedSeq", 60000, 100, apiVersions, metadata, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -1821,7 +1821,7 @@ public class SenderTest {
     @Test
     public void 
testTransactionalUnknownProducerHandlingWhenRetentionLimitReached() throws 
Exception {
         final long producerId = 343434L;
-        TransactionManager transactionManager = new 
TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, 
false);
+        TransactionManager transactionManager = new 
TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, 
metadata, false);
 
         setupWithTransactionState(transactionManager);
         doInitTransactions(transactionManager, new 
ProducerIdAndEpoch(producerId, (short) 0));
@@ -2390,7 +2390,7 @@ public class SenderTest {
                 TOPIC_IDS.getOrDefault("testSplitBatchAndSend", 
Uuid.ZERO_UUID),
                 new TopicPartition("testSplitBatchAndSend", 1));
 
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testSplitBatchAndSend", 60000, 100, apiVersions, false);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testSplitBatchAndSend", 60000, 100, apiVersions, metadata, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2742,7 +2742,7 @@ public class SenderTest {
         Metrics m = new Metrics();
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         try {
-            TransactionManager txnManager = new TransactionManager(logContext, 
"testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions, false);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions, metadata, 
false);
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
                     maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 
RETRY_BACKOFF_MS, txnManager);
 
@@ -2775,7 +2775,7 @@ public class SenderTest {
             int lingerMs = 50;
             SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
-            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions, false);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions, metadata, false);
             setupWithTransactionState(txnManager, lingerMs);
 
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2832,7 +2832,7 @@ public class SenderTest {
         try (Metrics m = new Metrics()) {
             SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
-            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions, false);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions, metadata, false);
             setupWithTransactionState(txnManager);
 
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2903,7 +2903,7 @@ public class SenderTest {
         Metrics m = new Metrics();
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         try {
-            TransactionManager txnManager = new TransactionManager(logContext, 
"testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions, false);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions, metadata, 
false);
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
                     maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 
RETRY_BACKOFF_MS, txnManager);
 
@@ -2937,7 +2937,7 @@ public class SenderTest {
         Metrics m = new Metrics();
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         try {
-            TransactionManager txnManager = new TransactionManager(logContext, 
"testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions, false);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions, metadata, 
false);
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
                     maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 
RETRY_BACKOFF_MS, txnManager);
 
@@ -2968,7 +2968,7 @@ public class SenderTest {
     @Test
     public void testTransactionAbortedExceptionOnAbortWithoutError() throws 
InterruptedException {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions, 
false);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions, 
metadata, false);
 
         setupWithTransactionState(txnManager, false, null);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2994,7 +2994,7 @@ public class SenderTest {
     public void testDoNotPollWhenNoRequestSent() {
         client = spy(new MockClient(time, metadata));
 
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions, false);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions, metadata, false);
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3006,7 +3006,7 @@ public class SenderTest {
     @Test
     public void testTooLargeBatchesAreSafelyRemoved() throws 
InterruptedException {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testSplitBatchAndSend", 60000, 100, apiVersions, false);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testSplitBatchAndSend", 60000, 100, apiVersions, metadata, false);
 
         setupWithTransactionState(txnManager, false, null);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3059,6 +3059,7 @@ public class SenderTest {
                 60000,
                 RETRY_BACKOFF_MS,
                 apiVersions,
+                metadata,
                 false
         );
 
@@ -3129,7 +3130,7 @@ public class SenderTest {
     public void testReceiveFailedBatchTwiceWithTransactions() throws Exception 
{
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testFailTwice", 60000, 100, apiVersions, false);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testFailTwice", 60000, 100, apiVersions, metadata, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3179,7 +3180,7 @@ public class SenderTest {
     public void testInvalidTxnStateIsAnAbortableError() throws Exception {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testInvalidTxnState", 60000, 100, apiVersions, false);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testInvalidTxnState", 60000, 100, apiVersions, metadata, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3218,7 +3219,7 @@ public class SenderTest {
     public void testTransactionAbortableExceptionIsAnAbortableError() throws 
Exception {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-        TransactionManager txnManager = new TransactionManager(logContext, 
"textTransactionAbortableException", 60000, 100, apiVersions, false);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"textTransactionAbortableException", 60000, 100, apiVersions, metadata, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3257,7 +3258,7 @@ public class SenderTest {
     public void testAbortableErrorIsConvertedToFatalErrorDuringAbort() throws 
Exception {
 
         // Initialize and begin transaction
-        TransactionManager transactionManager = new 
TransactionManager(logContext, 
"testAbortableErrorIsConvertedToFatalErrorDuringAbort", 60000, 100, 
apiVersions, false);
+        TransactionManager transactionManager = new 
TransactionManager(logContext, 
"testAbortableErrorIsConvertedToFatalErrorDuringAbort", 60000, 100, 
apiVersions, metadata, false);
         setupWithTransactionState(transactionManager);
         doInitTransactions(transactionManager, new ProducerIdAndEpoch(1L, 
(short) 0));
         transactionManager.beginTransaction();
@@ -3818,7 +3819,7 @@ public class SenderTest {
     }
 
     private TransactionManager createTransactionManager() {
-        return new TransactionManager(new LogContext(), null, 0, 
RETRY_BACKOFF_MS, new ApiVersions(), false);
+        return new TransactionManager(new LogContext(), null, 0, 
RETRY_BACKOFF_MS, new ApiVersions(), metadata, false);
     }
     
     private void setupWithTransactionState(TransactionManager 
transactionManager) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index db76ee022b6..a85041d955f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MetadataSnapshot;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
@@ -51,6 +52,7 @@ import 
org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -209,7 +211,7 @@ public class TransactionManagerTest {
             finalizedFeaturesEpoch));
         finalizedFeaturesEpoch += 1;
         this.transactionManager = new TestableTransactionManager(logContext, 
transactionalId.orElse(null),
-                transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, 
enable2pc);
+                transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, 
this.metadata, enable2pc);
 
 
         int batchSize = 16 * 1024;
@@ -1060,7 +1062,7 @@ public class TransactionManagerTest {
                 .setMinVersionLevel((short) 1)),
             0));
         this.transactionManager = new TestableTransactionManager(logContext, 
transactionalId,
-            transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, 
false);
+            transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, 
this.metadata, false);
 
         int batchSize = 16 * 1024;
         int deliveryTimeoutMs = 3000;
@@ -2585,6 +2587,117 @@ public class TransactionManagerTest {
         testFatalErrorInTxnOffsetCommit(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT);
     }
 
+    @Test
+    public void testTxnOffsetCommitNegotiatesV6WhenAllTopicIdsAreAvailable() {
+        initializeTransactionManager(Optional.of(transactionalId), true);
+        doInitTransactions();
+        transactionManager.beginTransaction();
+
+        // Seed the metadata cache with the topic id so the request can 
negotiate v6.
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(
+            1, singletonMap(topic, 2), Map.of(topic, TOPIC_ID)));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp0, new OffsetAndMetadata(1));
+        offsets.put(tp1, new OffsetAndMetadata(1));
+
+        TransactionalRequestResult sendOffsetsResult = 
transactionManager.sendOffsetsToTransaction(
+            offsets, new ConsumerGroupMetadata(consumerGroupId));
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.GROUP, consumerGroupId);
+        TxnOffsetCommitResponseData responseData = new 
TxnOffsetCommitResponseData()
+            .setTopics(List.of(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(
+                    new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+                        .setPartitionIndex(tp0.partition())
+                        .setErrorCode(Errors.NONE.code()),
+                    new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+                        .setPartitionIndex(tp1.partition())
+                        .setErrorCode(Errors.NONE.code())))));
+        client.prepareResponse(request -> {
+            TxnOffsetCommitRequest txnRequest = (TxnOffsetCommitRequest) 
request;
+            assertEquals(ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true), 
txnRequest.version());
+            assertEquals(1, txnRequest.data().topics().size());
+            assertEquals(TOPIC_ID, 
txnRequest.data().topics().get(0).topicId());
+            return true;
+        }, new TxnOffsetCommitResponse(responseData));
+
+        runUntil(sendOffsetsResult::isCompleted);
+        assertTrue(sendOffsetsResult.isSuccessful());
+        assertFalse(transactionManager.hasPendingOffsetCommits());
+    }
+
+    @Test
+    public void testTxnOffsetCommitDowngradesToV5WhenAnyTopicIdIsMissing() {
+        initializeTransactionManager(Optional.of(transactionalId), true);
+        doInitTransactions();
+        transactionManager.beginTransaction();
+
+        // Only `topic` has a known id; `other` is not in the metadata cache,
+        // so the builder must downgrade to v5 (which encodes by name).
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(
+            1, singletonMap(topic, 2), Map.of(topic, TOPIC_ID)));
+
+        TopicPartition otherTp = new TopicPartition("other", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp0, new OffsetAndMetadata(1));
+        offsets.put(otherTp, new OffsetAndMetadata(1));
+
+        TransactionalRequestResult sendOffsetsResult = 
transactionManager.sendOffsetsToTransaction(
+            offsets, new ConsumerGroupMetadata(consumerGroupId));
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.GROUP, consumerGroupId);
+        Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
+        txnOffsetCommitResponse.put(tp0, Errors.NONE);
+        txnOffsetCommitResponse.put(otherTp, Errors.NONE);
+        client.prepareResponse(request -> {
+            TxnOffsetCommitRequest txnRequest = (TxnOffsetCommitRequest) 
request;
+            assertTrue(txnRequest.version() < 6,
+                "Expected downgrade to v0-5, got " + txnRequest.version());
+            txnRequest.data().topics().forEach(t -> assertFalse(t.name() == 
null || t.name().isEmpty()));
+            return true;
+        }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
+
+        runUntil(sendOffsetsResult::isCompleted);
+        assertTrue(sendOffsetsResult.isSuccessful());
+    }
+
+    @Test
+    public void testTxnOffsetCommitRetriesOnUnknownTopicIdAtV6() {
+        initializeTransactionManager(Optional.of(transactionalId), true);
+        doInitTransactions();
+        transactionManager.beginTransaction();
+
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(
+            1, singletonMap(topic, 2), Map.of(topic, TOPIC_ID)));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp0, new OffsetAndMetadata(1));
+
+        TransactionalRequestResult sendOffsetsResult = 
transactionManager.sendOffsetsToTransaction(
+            offsets, new ConsumerGroupMetadata(consumerGroupId));
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.GROUP, consumerGroupId);
+        TxnOffsetCommitResponseData unknownTopicId = new 
TxnOffsetCommitResponseData()
+            .setTopics(List.of(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+                    .setPartitionIndex(tp0.partition())
+                    .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code())))));
+        client.prepareResponse(request -> {
+            TxnOffsetCommitRequest txnRequest = (TxnOffsetCommitRequest) 
request;
+            assertEquals(ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true), 
txnRequest.version());
+            return true;
+        }, new TxnOffsetCommitResponse(unknownTopicId));
+
+        runUntil(() -> client.inFlightRequestCount() == 0);
+        assertFalse(sendOffsetsResult.isCompleted(),
+            "UNKNOWN_TOPIC_ID is retriable; the commit should still be 
pending");
+        assertTrue(transactionManager.hasPendingOffsetCommits());
+        assertFalse(transactionManager.hasError());
+    }
+
     private void testFatalErrorInTxnOffsetCommit(final Errors error) {
         testFatalErrorInTxnOffsetCommit(error, error);
     }
@@ -4482,6 +4595,7 @@ public class TransactionManagerTest {
                                                 Map<TopicPartition, Errors> 
txnOffsetCommitResponse) {
         client.prepareResponse(request -> {
             TxnOffsetCommitRequest txnOffsetCommitRequest = 
(TxnOffsetCommitRequest) request;
+            assertTxnOffsetCommitRequestUsesTopicNames(txnOffsetCommitRequest);
             assertEquals(consumerGroupId, 
txnOffsetCommitRequest.data().groupId());
             assertEquals(producerId, 
txnOffsetCommitRequest.data().producerId());
             assertEquals(producerEpoch, 
txnOffsetCommitRequest.data().producerEpoch());
@@ -4498,6 +4612,7 @@ public class TransactionManagerTest {
                                                 Map<TopicPartition, Errors> 
txnOffsetCommitResponse) {
         client.prepareResponse(request -> {
             TxnOffsetCommitRequest txnOffsetCommitRequest = 
(TxnOffsetCommitRequest) request;
+            assertTxnOffsetCommitRequestUsesTopicNames(txnOffsetCommitRequest);
             assertEquals(consumerGroupId, 
txnOffsetCommitRequest.data().groupId());
             assertEquals(producerId, 
txnOffsetCommitRequest.data().producerId());
             assertEquals(producerEpoch, 
txnOffsetCommitRequest.data().producerEpoch());
@@ -4508,6 +4623,14 @@ public class TransactionManagerTest {
         }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
     }
 
+    private static void 
assertTxnOffsetCommitRequestUsesTopicNames(TxnOffsetCommitRequest request) {
+        assertTrue(request.version() < 6,
+            "Expected TxnOffsetCommit request at version < 6, got " + 
request.version());
+        request.data().topics().forEach(topic -> assertFalse(
+            topic.name() == null || topic.name().isEmpty(),
+            "Expected every request topic to carry a non-empty name at version 
" + request.version()));
+    }
+
     private ProduceResponse produceResponse(TopicPartition tp, long offset, 
Errors error, int throttleTimeMs) {
         return produceResponse(tp, offset, error, throttleTimeMs, 10);
     }
@@ -4658,8 +4781,9 @@ public class TransactionManagerTest {
                                           int transactionTimeoutMs,
                                           long retryBackoffMs,
                                           ApiVersions apiVersions,
+                                          Metadata metadata,
                                           boolean enable2Pc) {
-            super(logContext, transactionalId, transactionTimeoutMs, 
retryBackoffMs, apiVersions, enable2Pc);
+            super(logContext, transactionalId, transactionTimeoutMs, 
retryBackoffMs, apiVersions, metadata, enable2Pc);
             this.shouldPoisonStateOnInvalidTransitionOverride = 
Optional.empty();
         }
 


Reply via email to