This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 947da292f50 KAFKA-20310: Persist previousProducerId and nextProducerId 
in transaction log (#21828)
947da292f50 is described below

commit 947da292f50074793fa7726151bed4c85106eed8
Author: Angelo R. <[email protected]>
AuthorDate: Sat Mar 28 10:45:48 2026 -0700

    KAFKA-20310: Persist previousProducerId and nextProducerId in transaction 
log (#21828)
    
    ## Summary
    
    `TransactionLog.valueToBytes()` does not set `PreviousProducerId` or
    `NextProducerId` on the `TransactionLogValue` when serializing
    transaction metadata. Both fields exist in the schema (tagged fields at
    version 1+, tags 0 and 1) and are correctly read back by
    `readTxnRecord()`, but because they are never written, they always
    deserialize as `-1` (`NO_PRODUCER_ID`) after coordinator failover.
    
    ## Impact
    
    This causes two failure modes described in KAFKA-20310:
    
    1. **Coordinator failover during epoch exhaustion:** `nextProducerId` is
    lost, so `prepareComplete()` cannot rotate to the new producer ID. The
    producer is stuck at the exhausted epoch with no recovery path.
    
    2. **Client retry after epoch rotation failover:** `prevProducerId` is
    lost, so the validation check `prevProducerId == expectedProducerId`
    fails. The client receives `PRODUCER_FENCED`.
    
    ## Fix
    
    Set both fields in `valueToBytes()`, guarded by `logValueVersion >= 1`
    since they are tagged fields only available in the flexible version
    (1+). At version 0, the fields are not set, preserving backward
    compatibility.
    
    ## Compatibility
    
    These are tagged fields (tags 0 and 1) in the `TransactionLogValue`
    schema. Tagged fields are forward/backward compatible by design:
    - Older brokers reading version 1 logs ignore unknown tags.
    - Newer brokers reading version 0 logs see the default value (`-1`),
    which is the existing behavior.
    
    `NextProducerEpoch` (tag 3) was intentionally left out of this fix
    because the read path currently hardcodes
    `RecordBatch.NO_PRODUCER_EPOCH` rather than reading the field, making it
    a separate behavioral discussion.
    
    ## Tests
    
    Two new tests added to `TransactionLogTest`:
    - `shouldRoundTripPreviousAndNextProducerIds` — verifies both fields
    survive serialization at version 1+
    - `shouldNotPersistProducerIdsAtVersion0` — verifies version 0
    serialization is unchanged (fields default to `NO_PRODUCER_ID`)
    
    All 16 existing tests continue to pass.
    
    Reviewers: Justine Olshan <[email protected]>, chickenchickenlove
     <[email protected]>
---
 .../coordinator/transaction/TransactionLog.java    | 15 ++++--
 .../transaction/TransactionLogTest.java            | 56 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
index a5b6e62476c..b63141ca5dc 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
@@ -84,9 +84,8 @@ public class TransactionLog {
                             
.setPartitionIds(entry.getValue().stream().map(TopicPartition::partition).toList())).toList();
         }
 
-        return MessageUtil.toVersionPrefixedBytes(
-                transactionVersionLevel.transactionLogValueVersion(),
-                new TransactionLogValue()
+        short logValueVersion = 
transactionVersionLevel.transactionLogValueVersion();
+        TransactionLogValue value = new TransactionLogValue()
                         .setProducerId(txnMetadata.producerId())
                         .setProducerEpoch(txnMetadata.producerEpoch())
                         .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs())
@@ -94,8 +93,14 @@ public class TransactionLog {
                         
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp())
                         
.setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp())
                         .setTransactionPartitions(transactionPartitions)
-                        
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel())
-        );
+                        
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel());
+
+        if (logValueVersion >= 1) {
+            value.setPreviousProducerId(txnMetadata.prevProducerId());
+            value.setNextProducerId(txnMetadata.nextProducerId());
+        }
+
+        return MessageUtil.toVersionPrefixedBytes(logValueVersion, value);
     }
 
     /**
diff --git 
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
 
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
index dcec188c754..6dcce9b25b7 100644
--- 
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
+++ 
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
@@ -127,6 +127,62 @@ class TransactionLogTest {
         }
     }
   
+    @Test
+    void shouldRoundTripPreviousAndNextProducerIds() {
+        var txnTransitMetadata = new TxnTransitMetadata(
+            200L,       // producerId
+            100L,       // prevProducerId
+            201L,       // nextProducerId
+            (short) 5,  // producerEpoch
+            (short) 4,  // lastProducerEpoch
+            1000,       // txnTimeoutMs
+            TransactionState.PREPARE_COMMIT,
+            new HashSet<>(Set.of(new TopicPartition("topic", 0))),
+            0L,         // txnStartTimestamp
+            0L,         // txnLastUpdateTimestamp
+            TV_2
+        );
+
+        // Serialize to bytes and deserialize the raw TransactionLogValue
+        var bytes = TransactionLog.valueToBytes(txnTransitMetadata, TV_2);
+        var buffer = wrap(bytes);
+        buffer.getShort(); // skip version prefix
+        var value = new TransactionLogValue(new ByteBufferAccessor(buffer), 
(short) 1);
+
+        assertEquals(200L, value.producerId());
+        assertEquals(100L, value.previousProducerId());
+        assertEquals(201L, value.nextProducerId());
+    }
+
+    @Test
+    void shouldNotPersistProducerIdsAtVersion0() {
+        // Version 0 is non-flexible, so tagged fields (previousProducerId,
+        // nextProducerId) cannot be written. They fall back to defaults (-1).
+        var txnTransitMetadata = new TxnTransitMetadata(
+            200L,       // producerId
+            100L,       // prevProducerId — not persisted at v0
+            201L,       // nextProducerId — not persisted at v0
+            (short) 5,  // producerEpoch
+            (short) 4,  // lastProducerEpoch
+            1000,       // txnTimeoutMs
+            TransactionState.PREPARE_COMMIT,
+            new HashSet<>(Set.of(new TopicPartition("topic", 0))),
+            0L,         // txnStartTimestamp
+            0L,         // txnLastUpdateTimestamp
+            TV_0
+        );
+
+        var record = MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord(
+            TransactionLog.keyToBytes("transactionalId"),
+            TransactionLog.valueToBytes(txnTransitMetadata, TV_0)
+        )).records().iterator().next();
+        var readResult = assertInstanceOf(TransactionLog.TxnRecord.class, 
TransactionLog.read(record.key(), record.value()));
+        var deserialized = readResult.metadata();
+
+        assertEquals(200L, deserialized.producerId());
+        assertEquals(-1L, deserialized.prevProducerId());
+    }
+
     @Test
     void testSerializeTransactionLogValueToHighestNonFlexibleVersion() {
         var txnTransitMetadata = new TxnTransitMetadata(1L, 1L, 1L, (short) 1, 
(short) 1, 1000, TransactionState.COMPLETE_COMMIT, new HashSet<>(), 500L, 500L, 
TV_0);

Reply via email to