This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eb111f6695e KAFKA-20310: Persist previousProducerId and nextProducerId
in transaction log (#21828)
eb111f6695e is described below
commit eb111f6695ef30889e7367bbad759f7e772d65ea
Author: Angelo R. <[email protected]>
AuthorDate: Sat Mar 28 10:45:48 2026 -0700
KAFKA-20310: Persist previousProducerId and nextProducerId in transaction
log (#21828)
## Summary
`TransactionLog.valueToBytes()` does not set `PreviousProducerId` or
`NextProducerId` on the `TransactionLogValue` when serializing
transaction metadata. Both fields exist in the schema (tagged fields at
version 1+, tags 0 and 1) and are correctly read back by
`readTxnRecord()`, but because they are never written, they always
deserialize as `-1` (`NO_PRODUCER_ID`) after coordinator failover.
## Impact
This causes two failure modes described in KAFKA-20310:
1. **Coordinator failover during epoch exhaustion:** `nextProducerId` is
lost, so `prepareComplete()` cannot rotate to the new producer ID. The
producer is stuck at the exhausted epoch with no recovery path.
2. **Client retry after epoch rotation failover:** `prevProducerId` is
lost, so the validation check `prevProducerId == expectedProducerId`
fails. The client receives `PRODUCER_FENCED`.
## Fix
Set both fields in `valueToBytes()`, guarded by `logValueVersion >= 1`
since they are tagged fields only available in the flexible version
(1+). At version 0, the fields are not set, preserving backward
compatibility.
## Compatibility
These are tagged fields (tags 0 and 1) in the `TransactionLogValue`
schema. Tagged fields are forward/backward compatible by design:
- Older brokers reading version 1 logs ignore unknown tags.
- Newer brokers reading version 0 logs see the default value (`-1`),
which is the existing behavior.
`NextProducerEpoch` (tag 3) was intentionally left out of this fix
because the read path currently hardcodes
`RecordBatch.NO_PRODUCER_EPOCH` rather than reading the field, making it
a separate behavioral discussion.
## Tests
Two new tests added to `TransactionLogTest`:
- `shouldRoundTripPreviousAndNextProducerIds` — verifies both fields
survive serialization at version 1+
- `shouldNotPersistProducerIdsAtVersion0` — verifies version 0
serialization is unchanged (fields default to `NO_PRODUCER_ID`)
All 16 existing tests continue to pass.
Reviewers: Justine Olshan <[email protected]>, chickenchickenlove
<[email protected]>
---
.../coordinator/transaction/TransactionLog.java | 15 ++++--
.../transaction/TransactionLogTest.java | 56 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 5 deletions(-)
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
index a5b6e62476c..b63141ca5dc 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
@@ -84,9 +84,8 @@ public class TransactionLog {
.setPartitionIds(entry.getValue().stream().map(TopicPartition::partition).toList())).toList();
}
- return MessageUtil.toVersionPrefixedBytes(
- transactionVersionLevel.transactionLogValueVersion(),
- new TransactionLogValue()
+ short logValueVersion =
transactionVersionLevel.transactionLogValueVersion();
+ TransactionLogValue value = new TransactionLogValue()
.setProducerId(txnMetadata.producerId())
.setProducerEpoch(txnMetadata.producerEpoch())
.setTransactionTimeoutMs(txnMetadata.txnTimeoutMs())
@@ -94,8 +93,14 @@ public class TransactionLog {
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp())
.setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp())
.setTransactionPartitions(transactionPartitions)
-
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel())
- );
+
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel());
+
+ if (logValueVersion >= 1) {
+ value.setPreviousProducerId(txnMetadata.prevProducerId());
+ value.setNextProducerId(txnMetadata.nextProducerId());
+ }
+
+ return MessageUtil.toVersionPrefixedBytes(logValueVersion, value);
}
/**
diff --git
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
index dcec188c754..6dcce9b25b7 100644
---
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
+++
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
@@ -127,6 +127,62 @@ class TransactionLogTest {
}
}
+ @Test
+ void shouldRoundTripPreviousAndNextProducerIds() {
+ var txnTransitMetadata = new TxnTransitMetadata(
+ 200L, // producerId
+ 100L, // prevProducerId
+ 201L, // nextProducerId
+ (short) 5, // producerEpoch
+ (short) 4, // lastProducerEpoch
+ 1000, // txnTimeoutMs
+ TransactionState.PREPARE_COMMIT,
+ new HashSet<>(Set.of(new TopicPartition("topic", 0))),
+ 0L, // txnStartTimestamp
+ 0L, // txnLastUpdateTimestamp
+ TV_2
+ );
+
+ // Serialize to bytes and deserialize the raw TransactionLogValue
+ var bytes = TransactionLog.valueToBytes(txnTransitMetadata, TV_2);
+ var buffer = wrap(bytes);
+ buffer.getShort(); // skip version prefix
+ var value = new TransactionLogValue(new ByteBufferAccessor(buffer),
(short) 1);
+
+ assertEquals(200L, value.producerId());
+ assertEquals(100L, value.previousProducerId());
+ assertEquals(201L, value.nextProducerId());
+ }
+
+ @Test
+ void shouldNotPersistProducerIdsAtVersion0() {
+ // Version 0 is non-flexible, so tagged fields (previousProducerId,
+ // nextProducerId) cannot be written. They fall back to defaults (-1).
+ var txnTransitMetadata = new TxnTransitMetadata(
+ 200L, // producerId
+ 100L, // prevProducerId — not persisted at v0
+ 201L, // nextProducerId — not persisted at v0
+ (short) 5, // producerEpoch
+ (short) 4, // lastProducerEpoch
+ 1000, // txnTimeoutMs
+ TransactionState.PREPARE_COMMIT,
+ new HashSet<>(Set.of(new TopicPartition("topic", 0))),
+ 0L, // txnStartTimestamp
+ 0L, // txnLastUpdateTimestamp
+ TV_0
+ );
+
+ var record = MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord(
+ TransactionLog.keyToBytes("transactionalId"),
+ TransactionLog.valueToBytes(txnTransitMetadata, TV_0)
+ )).records().iterator().next();
+ var readResult = assertInstanceOf(TransactionLog.TxnRecord.class,
TransactionLog.read(record.key(), record.value()));
+ var deserialized = readResult.metadata();
+
+ assertEquals(200L, deserialized.producerId());
+ assertEquals(-1L, deserialized.prevProducerId());
+ }
+
@Test
void testSerializeTransactionLogValueToHighestNonFlexibleVersion() {
var txnTransitMetadata = new TxnTransitMetadata(1L, 1L, 1L, (short) 1,
(short) 1, 1000, TransactionState.COMPLETE_COMMIT, new HashSet<>(), 500L, 500L,
TV_0);