This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 83976543fef KAFKA-20444: [10/N] Wire topic IDs through
TransactionManager (KIP-1319) (#22443)
83976543fef is described below
commit 83976543fef1fe942bf021c8eec787c75dd58bdd
Author: David Jacot <[email protected]>
AuthorDate: Tue Jun 2 16:33:29 2026 +0200
KAFKA-20444: [10/N] Wire topic IDs through TransactionManager (KIP-1319)
(#22443)
This patch lets `TxnOffsetCommit` requests negotiate v6+ (KIP-1319) when
the producer's metadata cache knows the topic ids for every topic in the
offsets map.
Key changes:
- `TransactionManager` gains a `Metadata` reference (constructor
parameter wired from `KafkaProducer`). It is used to look up topic
ids when building a `TxnOffsetCommit` request.
- `txnOffsetCommitHandler` makes a single pass over the offsets map:
it populates `pendingTxnOffsetCommits`, builds each request topic
(via `computeIfAbsent` over a name -> topic map, with the topic id
resolved from `metadata.topicIds()`), and accumulates a snapshot
`Map<Uuid, String> topicNamesByIds` for the response handler. When
every topic has a non-zero id, the builder is chosen via
`forTopicIdsOrNames(...)` so v6 can be negotiated; otherwise it
falls back to `forTopicNames(...)` and caps at v5.
- The response handler resolves the topic name from the snapshot
(not the live metadata cache) when a v6+ response topic omits the
name on the wire. If the id is unknown to the snapshot, the
handler logs a warning and skips that topic so the retry loop is
unaffected.
Test changes:
- `prepareTxnOffsetCommitResponse` in `TransactionManagerTest` now
asserts that the inspected request is at version < 6 and that
every request topic carries a non-empty name. This strengthens the
existing v5-path coverage.
- Three new tests cover the v6 path. They seed the metadata cache
with a known topic id via `RequestTestUtils.metadataUpdateWithIds`
and then assert the request's negotiated version, topic ids, and
error handling:
`testTxnOffsetCommitNegotiatesV6WhenAllTopicIdsAreAvailable`,
`testTxnOffsetCommitDowngradesToV5WhenAnyTopicIdIsMissing`, and
`testTxnOffsetCommitRetriesOnUnknownTopicIdAtV6`.
`KafkaProducer.sendOffsetsToTransaction` will be updated in a follow-up
to refresh the metadata cache before delegating, so the v6 negotiation
actually fires for topics that the producer has not seen yet.
Reviewers: Lianet Magrans <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 27 ++---
.../producer/internals/TransactionManager.java | 84 ++++++++++---
.../clients/producer/internals/SenderTest.java | 39 ++++---
.../producer/internals/TransactionManagerTest.java | 130 ++++++++++++++++++++-
4 files changed, 231 insertions(+), 49 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 63303f8e089..cd495956269 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -427,6 +427,19 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = apiVersions;
+ List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
+ if (metadata != null) {
+ this.metadata = metadata;
+ } else {
+ this.metadata = new ProducerMetadata(retryBackoffMs,
+ retryBackoffMaxMs,
+ config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
+
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
+ logContext,
+ clusterResourceListeners,
+ Time.SYSTEM);
+ this.metadata.bootstrap(addresses);
+ }
this.transactionManager = configureTransactionState(config,
logContext);
// There is no need to do work required for adaptive partitioning,
if we use a custom partitioner.
boolean enableAdaptivePartitioning = partitionerPlugin.get() ==
null &&
@@ -454,19 +467,6 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
transactionManager,
new BufferPool(this.totalMemorySize, batchSize, metrics,
time, PRODUCER_METRIC_GROUP_NAME));
- List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
- if (metadata != null) {
- this.metadata = metadata;
- } else {
- this.metadata = new ProducerMetadata(retryBackoffMs,
- retryBackoffMaxMs,
- config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
-
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
- logContext,
- clusterResourceListeners,
- Time.SYSTEM);
- this.metadata.bootstrap(addresses);
- }
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
@@ -622,6 +622,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
transactionTimeoutMs,
retryBackoffMs,
apiVersions,
+ metadata,
enable2PC
);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 2b10ceac6f3..266bdeb0270 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.CommitFailedException;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -49,6 +51,7 @@ import
org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import
org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -100,6 +103,7 @@ public class TransactionManager {
private final String transactionalId;
private final int transactionTimeoutMs;
private final ApiVersions apiVersions;
+ private final Metadata metadata;
private final TxnPartitionMap txnPartitionMap;
@@ -231,6 +235,7 @@ public class TransactionManager {
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions,
+ final Metadata metadata,
final boolean enable2PC) {
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.transactionalId = transactionalId;
@@ -248,6 +253,7 @@ public class TransactionManager {
this.retryBackoffMs = retryBackoffMs;
this.txnPartitionMap = new TxnPartitionMap(logContext);
this.apiVersions = apiVersions;
+ this.metadata = metadata;
this.enable2PC = enable2PC;
}
@@ -1243,14 +1249,43 @@ public class TransactionManager {
private TxnOffsetCommitHandler
txnOffsetCommitHandler(TransactionalRequestResult result,
Map<TopicPartition,
OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) {
- for (Map.Entry<TopicPartition, OffsetAndMetadata> entry :
offsets.entrySet()) {
- OffsetAndMetadata offsetAndMetadata = entry.getValue();
- CommittedOffset committedOffset = new
CommittedOffset(offsetAndMetadata.offset(),
- offsetAndMetadata.metadata(),
offsetAndMetadata.leaderEpoch());
- pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
+ // Resolve topic ids from the metadata cache at request build time.
+ // KafkaProducer.sendOffsetsToTransaction has already ensured the cache
+ // is fresh for these topics, so this is a non-blocking lookup.
+ var topicIds = metadata.topicIds();
+ var requestTopicsByName = new HashMap<String,
TxnOffsetCommitRequestTopic>();
+ var topicNamesByIds = new HashMap<Uuid, String>();
+ var topics = new ArrayList<TxnOffsetCommitRequestTopic>();
+ var allHaveTopicIds = true;
+ for (var entry : offsets.entrySet()) {
+ var tp = entry.getKey();
+ var offsetAndMetadata = entry.getValue();
+ pendingTxnOffsetCommits.put(
+ tp,
+ new CommittedOffset(
+ offsetAndMetadata.offset(),
+ offsetAndMetadata.metadata(),
+ offsetAndMetadata.leaderEpoch()
+ )
+ );
+ var topicId = topicIds.getOrDefault(tp.topic(), Uuid.ZERO_UUID);
+ allHaveTopicIds &= !topicId.equals(Uuid.ZERO_UUID);
+ var topic = requestTopicsByName.computeIfAbsent(tp.topic(), name
-> {
+ var t = new
TxnOffsetCommitRequestTopic().setName(name).setTopicId(topicId);
+ topics.add(t);
+ if (!topicId.equals(Uuid.ZERO_UUID)) {
+ topicNamesByIds.put(topicId, name);
+ }
+ return t;
+ });
+ topic.partitions().add(new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(tp.partition())
+ .setCommittedOffset(offsetAndMetadata.offset())
+
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+ .setCommittedMetadata(offsetAndMetadata.metadata()));
}
- final TxnOffsetCommitRequestData data = new
TxnOffsetCommitRequestData()
+ var data = new TxnOffsetCommitRequestData()
.setTransactionalId(transactionalId)
.setGroupId(groupMetadata.groupId())
.setProducerId(producerIdAndEpoch.producerId)
@@ -1258,14 +1293,15 @@ public class TransactionManager {
.setMemberId(groupMetadata.memberId())
.setGenerationIdOrMemberEpoch(groupMetadata.generationId())
.setGroupInstanceId(groupMetadata.groupInstanceId().orElse(null))
-
.setTopics(TxnOffsetCommitRequest.getTopics(pendingTxnOffsetCommits));
- final TxnOffsetCommitRequest.Builder builder =
- TxnOffsetCommitRequest.Builder.forTopicNames(data,
isTransactionV2Enabled());
+ .setTopics(topics);
+ var builder = allHaveTopicIds
+ ? TxnOffsetCommitRequest.Builder.forTopicIdsOrNames(data,
isTransactionV2Enabled(), true)
+ : TxnOffsetCommitRequest.Builder.forTopicNames(data,
isTransactionV2Enabled());
if (result == null) {
// In this case, transaction V2 is in use.
- return new TxnOffsetCommitHandler(builder);
+ return new TxnOffsetCommitHandler(builder, topicNamesByIds);
}
- return new TxnOffsetCommitHandler(result, builder);
+ return new TxnOffsetCommitHandler(result, builder, topicNamesByIds);
}
private void throwIfPendingState(TransactionOperation operation) {
@@ -1877,16 +1913,25 @@ public class TransactionManager {
private class TxnOffsetCommitHandler extends TxnRequestHandler {
private final TxnOffsetCommitRequest.Builder builder;
+ // Snapshot of the topic-id -> name map captured when the request was
+ // built. v6+ responses omit the topic name on the wire, so we resolve
+ // it via this snapshot rather than the live metadata cache (which may
+ // have changed by the time the response arrives).
+ private final Map<Uuid, String> topicNamesByIds;
private TxnOffsetCommitHandler(TransactionalRequestResult result,
- TxnOffsetCommitRequest.Builder builder)
{
+ TxnOffsetCommitRequest.Builder builder,
+ Map<Uuid, String> topicNamesByIds) {
super(result);
this.builder = builder;
+ this.topicNamesByIds = topicNamesByIds;
}
- private TxnOffsetCommitHandler(TxnOffsetCommitRequest.Builder builder)
{
+ private TxnOffsetCommitHandler(TxnOffsetCommitRequest.Builder builder,
+ Map<Uuid, String> topicNamesByIds) {
super("TxnOffsetCommitHandler");
this.builder = builder;
+ this.topicNamesByIds = topicNamesByIds;
}
@Override
@@ -1909,6 +1954,7 @@ public class TransactionManager {
return builder.data.groupId();
}
+ @SuppressWarnings({"checkstyle:NPathComplexity"})
@Override
public void handleResponse(AbstractResponse response) {
TxnOffsetCommitResponse txnOffsetCommitResponse =
(TxnOffsetCommitResponse) response;
@@ -1918,8 +1964,18 @@ public class TransactionManager {
builder.data.groupId(),
txnOffsetCommitResponse.data().topics());
for (TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic
responseTopic : txnOffsetCommitResponse.data().topics()) {
+ // v6+ responses omit the topic name on the wire; fall back to
+ // the snapshot taken at request build time.
+ String topicName = responseTopic.name() != null &&
!responseTopic.name().isEmpty()
+ ? responseTopic.name()
+ : topicNamesByIds.get(responseTopic.topicId());
+ if (topicName == null) {
+ log.warn("Received TxnOffsetCommit response for consumer
group {} with an unknown topic id {}",
+ builder.data.groupId(), responseTopic.topicId());
+ continue;
+ }
for
(TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition responsePartition
: responseTopic.partitions()) {
- TopicPartition topicPartition = new
TopicPartition(responseTopic.name(), responsePartition.partitionIndex());
+ TopicPartition topicPartition = new
TopicPartition(topicName, responsePartition.partitionIndex());
Errors error =
Errors.forCode(responsePartition.errorCode());
if (error == Errors.NONE) {
pendingTxnOffsetCommits.remove(topicPartition);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 2a4908c1e46..a91adc502ef 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -514,7 +514,7 @@ public class SenderTest {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"testUnresolvedSeq", 60000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testUnresolvedSeq", 60000, 100, apiVersions, metadata, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -642,7 +642,7 @@ public class SenderTest {
// Initialize transaction manager. InitProducerId will be queued up
until metadata response
// is processed and FindCoordinator can be sent to `leastLoadedNode`.
TransactionManager transactionManager = new TransactionManager(new
LogContext(), "testInitProducerIdWithPendingMetadataRequest",
- 60000, 100L, new ApiVersions(), false);
+ 60000, 100L, new ApiVersions(), metadata, false);
setupWithTransactionState(transactionManager, false, null, false);
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(producerId, (short) 0);
transactionManager.initializeTransactions(false);
@@ -694,7 +694,7 @@ public class SenderTest {
client = new MockClient(time, metadata);
TransactionManager transactionManager = new TransactionManager(new
LogContext(), "testNodeNotReady",
- 60000, 100L, new ApiVersions(), false);
+ 60000, 100L, new ApiVersions(), metadata, false);
setupWithTransactionState(transactionManager, false, null, true);
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(producerId, (short) 0);
transactionManager.initializeTransactions(false);
@@ -1536,7 +1536,7 @@ public class SenderTest {
public void testUnresolvedSequencesAreNotFatal() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"testUnresolvedSeq", 60000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testUnresolvedSeq", 60000, 100, apiVersions, metadata, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -1821,7 +1821,7 @@ public class SenderTest {
@Test
public void
testTransactionalUnknownProducerHandlingWhenRetentionLimitReached() throws
Exception {
final long producerId = 343434L;
- TransactionManager transactionManager = new
TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions,
false);
+ TransactionManager transactionManager = new
TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions,
metadata, false);
setupWithTransactionState(transactionManager);
doInitTransactions(transactionManager, new
ProducerIdAndEpoch(producerId, (short) 0));
@@ -2390,7 +2390,7 @@ public class SenderTest {
TOPIC_IDS.getOrDefault("testSplitBatchAndSend",
Uuid.ZERO_UUID),
new TopicPartition("testSplitBatchAndSend", 1));
- TransactionManager txnManager = new TransactionManager(logContext,
"testSplitBatchAndSend", 60000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testSplitBatchAndSend", 60000, 100, apiVersions, metadata, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2742,7 +2742,7 @@ public class SenderTest {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
- TransactionManager txnManager = new TransactionManager(logContext,
"testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions, metadata,
false);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT,
RETRY_BACKOFF_MS, txnManager);
@@ -2775,7 +2775,7 @@ public class SenderTest {
int lingerMs = 50;
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
- TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions, metadata, false);
setupWithTransactionState(txnManager, lingerMs);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2832,7 +2832,7 @@ public class SenderTest {
try (Metrics m = new Metrics()) {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
- TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions, metadata, false);
setupWithTransactionState(txnManager);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2903,7 +2903,7 @@ public class SenderTest {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
- TransactionManager txnManager = new TransactionManager(logContext,
"testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions, metadata,
false);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT,
RETRY_BACKOFF_MS, txnManager);
@@ -2937,7 +2937,7 @@ public class SenderTest {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
- TransactionManager txnManager = new TransactionManager(logContext,
"testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions, metadata,
false);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT,
RETRY_BACKOFF_MS, txnManager);
@@ -2968,7 +2968,7 @@ public class SenderTest {
@Test
public void testTransactionAbortedExceptionOnAbortWithoutError() throws
InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
- TransactionManager txnManager = new TransactionManager(logContext,
"testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions,
false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions,
metadata, false);
setupWithTransactionState(txnManager, false, null);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2994,7 +2994,7 @@ public class SenderTest {
public void testDoNotPollWhenNoRequestSent() {
client = spy(new MockClient(time, metadata));
- TransactionManager txnManager = new TransactionManager(logContext,
"testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions, metadata, false);
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3006,7 +3006,7 @@ public class SenderTest {
@Test
public void testTooLargeBatchesAreSafelyRemoved() throws
InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
- TransactionManager txnManager = new TransactionManager(logContext,
"testSplitBatchAndSend", 60000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testSplitBatchAndSend", 60000, 100, apiVersions, metadata, false);
setupWithTransactionState(txnManager, false, null);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3059,6 +3059,7 @@ public class SenderTest {
60000,
RETRY_BACKOFF_MS,
apiVersions,
+ metadata,
false
);
@@ -3129,7 +3130,7 @@ public class SenderTest {
public void testReceiveFailedBatchTwiceWithTransactions() throws Exception
{
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"testFailTwice", 60000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testFailTwice", 60000, 100, apiVersions, metadata, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3179,7 +3180,7 @@ public class SenderTest {
public void testInvalidTxnStateIsAnAbortableError() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"testInvalidTxnState", 60000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testInvalidTxnState", 60000, 100, apiVersions, metadata, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3218,7 +3219,7 @@ public class SenderTest {
public void testTransactionAbortableExceptionIsAnAbortableError() throws
Exception {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"textTransactionAbortableException", 60000, 100, apiVersions, false);
+ TransactionManager txnManager = new TransactionManager(logContext,
"textTransactionAbortableException", 60000, 100, apiVersions, metadata, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3257,7 +3258,7 @@ public class SenderTest {
public void testAbortableErrorIsConvertedToFatalErrorDuringAbort() throws
Exception {
// Initialize and begin transaction
- TransactionManager transactionManager = new
TransactionManager(logContext,
"testAbortableErrorIsConvertedToFatalErrorDuringAbort", 60000, 100,
apiVersions, false);
+ TransactionManager transactionManager = new
TransactionManager(logContext,
"testAbortableErrorIsConvertedToFatalErrorDuringAbort", 60000, 100,
apiVersions, metadata, false);
setupWithTransactionState(transactionManager);
doInitTransactions(transactionManager, new ProducerIdAndEpoch(1L,
(short) 0));
transactionManager.beginTransaction();
@@ -3818,7 +3819,7 @@ public class SenderTest {
}
private TransactionManager createTransactionManager() {
- return new TransactionManager(new LogContext(), null, 0,
RETRY_BACKOFF_MS, new ApiVersions(), false);
+ return new TransactionManager(new LogContext(), null, 0,
RETRY_BACKOFF_MS, new ApiVersions(), metadata, false);
}
private void setupWithTransactionState(TransactionManager
transactionManager) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index db76ee022b6..a85041d955f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
@@ -51,6 +52,7 @@ import
org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -209,7 +211,7 @@ public class TransactionManagerTest {
finalizedFeaturesEpoch));
finalizedFeaturesEpoch += 1;
this.transactionManager = new TestableTransactionManager(logContext,
transactionalId.orElse(null),
- transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions,
enable2pc);
+ transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions,
this.metadata, enable2pc);
int batchSize = 16 * 1024;
@@ -1060,7 +1062,7 @@ public class TransactionManagerTest {
.setMinVersionLevel((short) 1)),
0));
this.transactionManager = new TestableTransactionManager(logContext,
transactionalId,
- transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions,
false);
+ transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions,
this.metadata, false);
int batchSize = 16 * 1024;
int deliveryTimeoutMs = 3000;
@@ -2585,6 +2587,117 @@ public class TransactionManagerTest {
testFatalErrorInTxnOffsetCommit(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT);
}
+ @Test
+ public void testTxnOffsetCommitNegotiatesV6WhenAllTopicIdsAreAvailable() {
+ initializeTransactionManager(Optional.of(transactionalId), true);
+ doInitTransactions();
+ transactionManager.beginTransaction();
+
+ // Seed the metadata cache with the topic id so the request can
negotiate v6.
+ client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(
+ 1, singletonMap(topic, 2), Map.of(topic, TOPIC_ID)));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp0, new OffsetAndMetadata(1));
+ offsets.put(tp1, new OffsetAndMetadata(1));
+
+ TransactionalRequestResult sendOffsetsResult =
transactionManager.sendOffsetsToTransaction(
+ offsets, new ConsumerGroupMetadata(consumerGroupId));
+
+ prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.GROUP, consumerGroupId);
+ TxnOffsetCommitResponseData responseData = new
TxnOffsetCommitResponseData()
+ .setTopics(List.of(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(
+ new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(tp0.partition())
+ .setErrorCode(Errors.NONE.code()),
+ new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(tp1.partition())
+ .setErrorCode(Errors.NONE.code())))));
+ client.prepareResponse(request -> {
+ TxnOffsetCommitRequest txnRequest = (TxnOffsetCommitRequest)
request;
+ assertEquals(ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true),
txnRequest.version());
+ assertEquals(1, txnRequest.data().topics().size());
+ assertEquals(TOPIC_ID,
txnRequest.data().topics().get(0).topicId());
+ return true;
+ }, new TxnOffsetCommitResponse(responseData));
+
+ runUntil(sendOffsetsResult::isCompleted);
+ assertTrue(sendOffsetsResult.isSuccessful());
+ assertFalse(transactionManager.hasPendingOffsetCommits());
+ }
+
+ @Test
+ public void testTxnOffsetCommitDowngradesToV5WhenAnyTopicIdIsMissing() {
+ initializeTransactionManager(Optional.of(transactionalId), true);
+ doInitTransactions();
+ transactionManager.beginTransaction();
+
+ // Only `topic` has a known id; `other` is not in the metadata cache,
+ // so the builder must downgrade to v5 (which encodes by name).
+ client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(
+ 1, singletonMap(topic, 2), Map.of(topic, TOPIC_ID)));
+
+ TopicPartition otherTp = new TopicPartition("other", 0);
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp0, new OffsetAndMetadata(1));
+ offsets.put(otherTp, new OffsetAndMetadata(1));
+
+ TransactionalRequestResult sendOffsetsResult =
transactionManager.sendOffsetsToTransaction(
+ offsets, new ConsumerGroupMetadata(consumerGroupId));
+
+ prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.GROUP, consumerGroupId);
+ Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
+ txnOffsetCommitResponse.put(tp0, Errors.NONE);
+ txnOffsetCommitResponse.put(otherTp, Errors.NONE);
+ client.prepareResponse(request -> {
+ TxnOffsetCommitRequest txnRequest = (TxnOffsetCommitRequest)
request;
+ assertTrue(txnRequest.version() < 6,
+ "Expected downgrade to v0-5, got " + txnRequest.version());
+ txnRequest.data().topics().forEach(t -> assertFalse(t.name() ==
null || t.name().isEmpty()));
+ return true;
+ }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
+
+ runUntil(sendOffsetsResult::isCompleted);
+ assertTrue(sendOffsetsResult.isSuccessful());
+ }
+
+ @Test
+ public void testTxnOffsetCommitRetriesOnUnknownTopicIdAtV6() {
+ initializeTransactionManager(Optional.of(transactionalId), true);
+ doInitTransactions();
+ transactionManager.beginTransaction();
+
+ client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(
+ 1, singletonMap(topic, 2), Map.of(topic, TOPIC_ID)));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp0, new OffsetAndMetadata(1));
+
+ TransactionalRequestResult sendOffsetsResult =
transactionManager.sendOffsetsToTransaction(
+ offsets, new ConsumerGroupMetadata(consumerGroupId));
+
+ prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.GROUP, consumerGroupId);
+ TxnOffsetCommitResponseData unknownTopicId = new
TxnOffsetCommitResponseData()
+ .setTopics(List.of(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(tp0.partition())
+ .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code())))));
+ client.prepareResponse(request -> {
+ TxnOffsetCommitRequest txnRequest = (TxnOffsetCommitRequest)
request;
+ assertEquals(ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true),
txnRequest.version());
+ return true;
+ }, new TxnOffsetCommitResponse(unknownTopicId));
+
+ runUntil(() -> client.inFlightRequestCount() == 0);
+ assertFalse(sendOffsetsResult.isCompleted(),
+ "UNKNOWN_TOPIC_ID is retriable; the commit should still be
pending");
+ assertTrue(transactionManager.hasPendingOffsetCommits());
+ assertFalse(transactionManager.hasError());
+ }
+
private void testFatalErrorInTxnOffsetCommit(final Errors error) {
testFatalErrorInTxnOffsetCommit(error, error);
}
@@ -4482,6 +4595,7 @@ public class TransactionManagerTest {
Map<TopicPartition, Errors>
txnOffsetCommitResponse) {
client.prepareResponse(request -> {
TxnOffsetCommitRequest txnOffsetCommitRequest =
(TxnOffsetCommitRequest) request;
+ assertTxnOffsetCommitRequestUsesTopicNames(txnOffsetCommitRequest);
assertEquals(consumerGroupId,
txnOffsetCommitRequest.data().groupId());
assertEquals(producerId,
txnOffsetCommitRequest.data().producerId());
assertEquals(producerEpoch,
txnOffsetCommitRequest.data().producerEpoch());
@@ -4498,6 +4612,7 @@ public class TransactionManagerTest {
Map<TopicPartition, Errors>
txnOffsetCommitResponse) {
client.prepareResponse(request -> {
TxnOffsetCommitRequest txnOffsetCommitRequest =
(TxnOffsetCommitRequest) request;
+ assertTxnOffsetCommitRequestUsesTopicNames(txnOffsetCommitRequest);
assertEquals(consumerGroupId,
txnOffsetCommitRequest.data().groupId());
assertEquals(producerId,
txnOffsetCommitRequest.data().producerId());
assertEquals(producerEpoch,
txnOffsetCommitRequest.data().producerEpoch());
@@ -4508,6 +4623,14 @@ public class TransactionManagerTest {
}, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
}
+ private static void
assertTxnOffsetCommitRequestUsesTopicNames(TxnOffsetCommitRequest request) {
+ assertTrue(request.version() < 6,
+ "Expected TxnOffsetCommit request at version < 6, got " +
request.version());
+ request.data().topics().forEach(topic -> assertFalse(
+ topic.name() == null || topic.name().isEmpty(),
+ "Expected every request topic to carry a non-empty name at version
" + request.version()));
+ }
+
private ProduceResponse produceResponse(TopicPartition tp, long offset,
Errors error, int throttleTimeMs) {
return produceResponse(tp, offset, error, throttleTimeMs, 10);
}
@@ -4658,8 +4781,9 @@ public class TransactionManagerTest {
int transactionTimeoutMs,
long retryBackoffMs,
ApiVersions apiVersions,
+ Metadata metadata,
boolean enable2Pc) {
- super(logContext, transactionalId, transactionTimeoutMs,
retryBackoffMs, apiVersions, enable2Pc);
+ super(logContext, transactionalId, transactionTimeoutMs,
retryBackoffMs, apiVersions, metadata, enable2Pc);
this.shouldPoisonStateOnInvalidTransitionOverride =
Optional.empty();
}