This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70dfc4236cd KAFKA-19562 Replace hand-written AbortedTxn with generated 
protocol (#21577)
70dfc4236cd is described below

commit 70dfc4236cd865ba100ae56280fd7e60b8cde81f
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Tue Mar 24 01:47:21 2026 +0800

    KAFKA-19562 Replace hand-written AbortedTxn with generated protocol (#21577)
    
    Replace the manually maintained AbortedTxn class with a generated
    protocol message defined in AbortedTxn.json.
    
    Also simplifies TransactionIndex buffer management since the generated
    class copies field values at construction time instead of holding a
    buffer reference.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../main/resources/common/message/AbortedTxn.json  |  36 ++++++
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   7 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |   9 +-
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |   3 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |   7 +-
 .../log/remote/storage/RemoteLogManager.java       |   6 +-
 .../kafka/storage/internals/log/AbortedTxn.java    | 117 --------------------
 .../internals/log/CleanedTransactionMetadata.java  |   1 +
 .../kafka/storage/internals/log/Cleaner.java       |   1 +
 .../kafka/storage/internals/log/LocalLog.java      |   5 +-
 .../kafka/storage/internals/log/LogSegment.java    |   7 +-
 .../storage/internals/log/TransactionIndex.java    |  51 ++++-----
 .../internals/log/TxnIndexSearchResult.java        |   2 +
 .../kafka/storage/internals/log/UnifiedLog.java    |   1 +
 .../storage/internals/log/LogSegmentTest.java      |   1 +
 .../internals/log/RemoteIndexCacheTest.java        |   9 +-
 .../internals/log/TransactionIndexTest.java        | 122 ++++++++++++++++-----
 .../storage/internals/log/UnifiedLogTest.java      |   5 +-
 .../org/apache/kafka/tools/DumpLogSegments.java    |   4 +-
 .../apache/kafka/tools/DumpLogSegmentsTest.java    |   6 +-
 20 files changed, 203 insertions(+), 197 deletions(-)

diff --git a/clients/src/main/resources/common/message/AbortedTxn.json 
b/clients/src/main/resources/common/message/AbortedTxn.json
new file mode 100644
index 00000000000..bcd84dac953
--- /dev/null
+++ b/clients/src/main/resources/common/message/AbortedTxn.json
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "AbortedTxn",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    { "name": "ProducerId", "type": "int64", "versions": "0+",
+      "about": "The producer id associated with the aborted transaction"
+    },
+    { "name": "FirstOffset", "type": "int64", "versions": "0+",
+      "about": "The first offset in the aborted transaction"
+    },
+    { "name": "LastOffset", "type": "int64", "versions": "0+",
+      "about": "The last offset in the aborted transaction"
+    },
+    { "name": "LastStableOffset", "type": "int64", "versions": "0+",
+      "about": "The last stable offset at the time the transaction was aborted"
+    }
+    // Note: adding new fields may require TransactionIndex to be refactored 
to read version-per-record.
+  ]
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 0485f36b0ab..926aa14ddef 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -30,7 +30,8 @@ import 
org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, 
LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, 
LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, 
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, 
ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+import org.apache.kafka.common.message.AbortedTxn
+import org.apache.kafka.storage.internals.log.{AppendOrigin, 
CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, 
LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, 
LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, 
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, 
ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.internals.utils.Throttler
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
@@ -417,8 +418,8 @@ class LogCleanerTest extends Logging {
     assertEquals(20L, log.logEndOffset)
 
     val expectedAbortedTxns = util.List.of(
-      new AbortedTxn(producerId1, 8, 10, 11),
-      new AbortedTxn(producerId2, 11, 16, 17)
+      new 
AbortedTxn().setProducerId(producerId1).setFirstOffset(8).setLastOffset(10).setLastStableOffset(11),
+      new 
AbortedTxn().setProducerId(producerId2).setFirstOffset(11).setLastOffset(16).setLastStableOffset(17)
     )
 
     assertAllTransactionsComplete(log)
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index c479ff1c174..e65ba72a7a4 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -30,7 +30,8 @@ import org.apache.kafka.metadata.MockConfigRepository
 import org.apache.kafka.server.common.TransactionVersion
 import org.apache.kafka.server.util.{MockTime, Scheduler}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, 
EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, 
LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, 
ProducerStateManagerConfig, SnapshotFile, UnifiedLog}
+import org.apache.kafka.common.message.AbortedTxn
+import org.apache.kafka.storage.internals.log.{CleanerConfig, EpochEntry, 
LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, 
LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, 
ProducerStateManagerConfig, SnapshotFile, UnifiedLog}
 import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, 
assertFalse, assertNotEquals, assertThrows, assertTrue}
@@ -1281,7 +1282,7 @@ class LogLoaderTest {
     val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 
5)
     val reloadedLog = createLog(logDir, reloadedLogConfig, lastShutdownClean = 
false)
     val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
-    assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 
8L, 74L, 36L)), abortedTransactions)
+    assertEquals(List(new 
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
 new 
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)),
 abortedTransactions)
   }
 
   @Test
@@ -1332,7 +1333,7 @@ class LogLoaderTest {
     val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 
5)
     val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = 
recoveryPoint, lastShutdownClean = false)
     val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
-    assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 
8L, 74L, 36L)), abortedTransactions)
+    assertEquals(List(new 
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
 new 
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)),
 abortedTransactions)
   }
 
   @Test
@@ -1386,7 +1387,7 @@ class LogLoaderTest {
     val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 
5)
     val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = 
recoveryPoint, lastShutdownClean = false)
     val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
-    assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 
8L, 74L, 36L)), abortedTransactions)
+    assertEquals(List(new 
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
 new 
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)),
 abortedTransactions)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala 
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 5b4fc1d41e4..9875a3b2ce4 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -37,7 +37,8 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.Scheduler
 import 
org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
 DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, 
ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog}
+import org.apache.kafka.common.message.AbortedTxn
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, 
LogOffsetsListener, LogSegment, ProducerStateManager, 
ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import scala.jdk.CollectionConverters._
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 65a41b2579c..18c1f29002f 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -36,7 +36,8 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.{MockTime, Scheduler}
 
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, 
LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, 
OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+import org.apache.kafka.common.message.AbortedTxn
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, 
LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, 
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -128,8 +129,8 @@ class UnifiedLogTest {
 
     val abortedTransactions = LogTestUtils.allAbortedTransactions(log)
     val expectedTransactions = List(
-      new AbortedTxn(pid1, 0L, 29L, 8L),
-      new AbortedTxn(pid2, 8L, 74L, 36L)
+      new 
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
+      new 
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)
     )
 
     assertEquals(expectedTransactions, abortedTransactions)
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 27713827fef..cb2764f142b 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.internals.SecurityManagerCompatibility;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Quota;
@@ -60,7 +61,6 @@ import org.apache.kafka.server.quota.QuotaType;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
-import org.apache.kafka.storage.internals.log.AbortedTxn;
 import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
 import org.apache.kafka.storage.internals.log.AsyncOffsetReader;
 import org.apache.kafka.storage.internals.log.EpochEntry;
@@ -1893,7 +1893,9 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
 
         Consumer<List<AbortedTxn>> accumulator =
                 abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
-                        .map(AbortedTxn::asAbortedTransaction).toList());
+                        .map(txn -> new FetchResponseData.AbortedTransaction()
+                                .setProducerId(txn.producerId())
+                                .setFirstOffset(txn.firstOffset())).toList());
 
         long startTimeNs = time.nanoseconds();
         collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator, log);
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java
deleted file mode 100644
index 05217d97ba1..00000000000
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.storage.internals.log;
-
-import org.apache.kafka.common.message.FetchResponseData;
-
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-public class AbortedTxn {
-    static final int VERSION_OFFSET = 0;
-    static final int VERSION_SIZE = 2;
-    static final int PRODUCER_ID_OFFSET = VERSION_OFFSET + VERSION_SIZE;
-    static final int PRODUCER_ID_SIZE = 8;
-    static final int FIRST_OFFSET_OFFSET = PRODUCER_ID_OFFSET + 
PRODUCER_ID_SIZE;
-    static final int FIRST_OFFSET_SIZE = 8;
-    static final int LAST_OFFSET_OFFSET = FIRST_OFFSET_OFFSET + 
FIRST_OFFSET_SIZE;
-    static final int LAST_OFFSET_SIZE = 8;
-    static final int LAST_STABLE_OFFSET_OFFSET = LAST_OFFSET_OFFSET + 
LAST_OFFSET_SIZE;
-    static final int LAST_STABLE_OFFSET_SIZE = 8;
-    static final int TOTAL_SIZE = LAST_STABLE_OFFSET_OFFSET + 
LAST_STABLE_OFFSET_SIZE;
-
-    public static final short CURRENT_VERSION = 0;
-
-    final ByteBuffer buffer;
-
-    AbortedTxn(ByteBuffer buffer) {
-        Objects.requireNonNull(buffer);
-        this.buffer = buffer;
-    }
-
-    public AbortedTxn(CompletedTxn completedTxn, long lastStableOffset) {
-        this(completedTxn.producerId(), completedTxn.firstOffset(), 
completedTxn.lastOffset(), lastStableOffset);
-    }
-
-    public AbortedTxn(long producerId, long firstOffset, long lastOffset, long 
lastStableOffset) {
-        this(toByteBuffer(producerId, firstOffset, lastOffset, 
lastStableOffset));
-    }
-
-    private static ByteBuffer toByteBuffer(long producerId, long firstOffset, 
long lastOffset, long lastStableOffset) {
-        ByteBuffer buffer = ByteBuffer.allocate(TOTAL_SIZE);
-        buffer.putShort(CURRENT_VERSION);
-        buffer.putLong(producerId);
-        buffer.putLong(firstOffset);
-        buffer.putLong(lastOffset);
-        buffer.putLong(lastStableOffset);
-        buffer.flip();
-        return buffer;
-    }
-
-    public short version() {
-        return buffer.get(VERSION_OFFSET);
-    }
-
-    public long producerId() {
-        return buffer.getLong(PRODUCER_ID_OFFSET);
-    }
-
-    public long firstOffset() {
-        return buffer.getLong(FIRST_OFFSET_OFFSET);
-    }
-
-    public long lastOffset() {
-        return buffer.getLong(LAST_OFFSET_OFFSET);
-    }
-
-    public long lastStableOffset() {
-        return buffer.getLong(LAST_STABLE_OFFSET_OFFSET);
-    }
-
-    public FetchResponseData.AbortedTransaction asAbortedTransaction() {
-        return new FetchResponseData.AbortedTransaction()
-            .setProducerId(producerId())
-            .setFirstOffset(firstOffset());
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        AbortedTxn that = (AbortedTxn) o;
-        return buffer.equals(that.buffer);
-    }
-
-    @Override
-    public int hashCode() {
-        return buffer.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return "AbortedTxn(version=" + version()
-            + ", producerId=" + producerId()
-            + ", firstOffset=" + firstOffset()
-            + ", lastOffset=" + lastOffset()
-            + ", lastStableOffset=" + lastStableOffset()
-            + ")";
-    }
-
-}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java
index 6c4224aa2cd..e076f7d4f3d 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.record.internal.ControlRecordType;
 import org.apache.kafka.common.record.internal.Record;
 import org.apache.kafka.common.record.internal.RecordBatch;
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
index 47d4d8fbb39..de7c8893abd 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
@@ -19,6 +19,7 @@ package org.apache.kafka.storage.internals.log;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.record.internal.FileRecords;
 import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.MutableRecordBatch;
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
index b8ecc886ab0..786673db1dd 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.KafkaStorageException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.record.internal.FileLogInputStream;
 import org.apache.kafka.common.record.internal.FileRecords;
@@ -540,7 +541,9 @@ public class LocalLog {
         List<FetchResponseData.AbortedTransaction> abortedTransactions = new 
ArrayList<>();
         Consumer<List<AbortedTxn>> accumulator = abortedTxns -> {
             for (AbortedTxn abortedTxn : abortedTxns)
-                abortedTransactions.add(abortedTxn.asAbortedTransaction());
+                abortedTransactions.add(new 
FetchResponseData.AbortedTransaction()
+                    .setProducerId(abortedTxn.producerId())
+                    .setFirstOffset(abortedTxn.firstOffset()));
         };
         collectAbortedTransactions(startOffset, upperBoundOffset, segment, 
accumulator);
         return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 4d9769e3e10..d461a705de5 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -18,6 +18,7 @@ package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.message.AbortedTxn;
 import 
org.apache.kafka.common.record.internal.FileLogInputStream.FileChannelRecordBatch;
 import org.apache.kafka.common.record.internal.FileRecords;
 import org.apache.kafka.common.record.internal.FileRecords.LogOffsetPosition;
@@ -348,7 +349,11 @@ public class LogSegment implements Closeable {
     public void updateTxnIndex(CompletedTxn completedTxn, long 
lastStableOffset) throws IOException {
         if (completedTxn.isAborted()) {
             LOGGER.trace("Writing aborted transaction {} to transaction index, 
last stable offset is {}", completedTxn, lastStableOffset);
-            txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset));
+            txnIndex.append(new AbortedTxn()
+                .setProducerId(completedTxn.producerId())
+                .setFirstOffset(completedTxn.firstOffset())
+                .setLastOffset(completedTxn.lastOffset())
+                .setLastStableOffset(lastStableOffset));
         }
     }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
index 6283012c165..f2eb074926f 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.message.AbortedTxn;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.common.utils.Utils;
 
 import java.io.Closeable;
@@ -32,7 +34,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.OptionalLong;
-import java.util.function.Supplier;
 
 /**
  * The transaction index maintains metadata about the aborted transactions for 
each segment. This includes
@@ -47,6 +48,11 @@ import java.util.function.Supplier;
  */
 public class TransactionIndex implements Closeable {
 
+    // Note: if new fields are added to AbortedTxn, this code may need to be 
changed to read the
+    // version bytes first for each record and then determine the record body 
size based on the version.
+    private static final int ABORTED_TXN_RECORD_SIZE =
+        
MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, 
new AbortedTxn()).remaining();
+
     private record AbortedTxnWithPosition(AbortedTxn txn, int position) {
     }
 
@@ -82,7 +88,8 @@ public class TransactionIndex implements Closeable {
                     + file.getAbsolutePath());
         });
         lastOffset = OptionalLong.of(abortedTxn.lastOffset());
-        Utils.writeFully(channel(), abortedTxn.buffer.duplicate());
+        ByteBuffer buffer = 
MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, 
abortedTxn);
+        Utils.writeFully(channel(), buffer);
     }
 
     public void flush() throws IOException {
@@ -130,13 +137,11 @@ public class TransactionIndex implements Closeable {
     }
 
     public void truncateTo(long offset) throws IOException {
-        ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
         OptionalLong newLastOffset = OptionalLong.empty();
-        for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
+        for (AbortedTxnWithPosition txnWithPosition : iterable()) {
             AbortedTxn abortedTxn = txnWithPosition.txn;
-            long position = txnWithPosition.position;
             if (abortedTxn.lastOffset() >= offset) {
-                channel().truncate(position);
+                channel().truncate(txnWithPosition.position);
                 lastOffset = newLastOffset;
                 return;
             }
@@ -178,8 +183,7 @@ public class TransactionIndex implements Closeable {
      * @throws CorruptIndexException if any problems are found.
      */
     public void sanityCheck() {
-        ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
-        for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
+        for (AbortedTxnWithPosition txnWithPosition : iterable()) {
             AbortedTxn abortedTxn = txnWithPosition.txn;
             if (abortedTxn.lastOffset() < startOffset)
                 throw new CorruptIndexException("Last offset of aborted 
transaction " + abortedTxn + " in index "
@@ -216,22 +220,18 @@ public class TransactionIndex implements Closeable {
     }
 
     private Iterable<AbortedTxnWithPosition> iterable() {
-        return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE));
-    }
-
-    private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> 
allocate) {
         FileChannel channel = channelOrNull();
         if (channel == null)
             return List.of();
 
-        PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
-
         return () -> new Iterator<>() {
+            private final ByteBuffer buffer = 
ByteBuffer.allocate(ABORTED_TXN_RECORD_SIZE);
+            private int position = 0;
 
             @Override
             public boolean hasNext() {
                 try {
-                    return channel.position() - position.value >= 
AbortedTxn.TOTAL_SIZE;
+                    return channel.position() - position >= 
ABORTED_TXN_RECORD_SIZE;
                 } catch (IOException e) {
                     throw new KafkaException("Failed read position from the 
transaction index " + file.getAbsolutePath(), e);
                 }
@@ -240,17 +240,18 @@ public class TransactionIndex implements Closeable {
             @Override
             public AbortedTxnWithPosition next() {
                 try {
-                    ByteBuffer buffer = allocate.get();
-                    Utils.readFully(channel, buffer, position.value);
+                    buffer.clear();
+                    Utils.readFully(channel, buffer, position);
                     buffer.flip();
 
-                    AbortedTxn abortedTxn = new AbortedTxn(buffer);
-                    if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION)
-                        throw new KafkaException("Unexpected aborted 
transaction version " + abortedTxn.version()
-                            + " in transaction index " + 
file.getAbsolutePath() + ", current version is "
-                            + AbortedTxn.CURRENT_VERSION);
-                    AbortedTxnWithPosition nextEntry = new 
AbortedTxnWithPosition(abortedTxn, position.value);
-                    position.value += AbortedTxn.TOTAL_SIZE;
+                    short version = buffer.getShort();
+                    if (version < AbortedTxn.LOWEST_SUPPORTED_VERSION || 
version > AbortedTxn.HIGHEST_SUPPORTED_VERSION)
+                        throw new KafkaException("Unexpected aborted 
transaction version " + version
+                            + " in transaction index " + 
file.getAbsolutePath() + ", supported version range is "
+                            + AbortedTxn.LOWEST_SUPPORTED_VERSION + " to " + 
AbortedTxn.HIGHEST_SUPPORTED_VERSION);
+                    AbortedTxn abortedTxn = new AbortedTxn(new 
ByteBufferAccessor(buffer), version);
+                    AbortedTxnWithPosition nextEntry = new 
AbortedTxnWithPosition(abortedTxn, position);
+                    position += ABORTED_TXN_RECORD_SIZE;
                     return nextEntry;
                 } catch (IOException e) {
                     // We received an unexpected error reading from the index 
file. We propagate this as an
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java
index 6f9a425ea1a..ef1b14fff6f 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.message.AbortedTxn;
+
 import java.util.Collections;
 import java.util.List;
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index e523a3218c3..65f5042c41a 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.message.DescribeProducersResponseData;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.record.internal.CompressionType;
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index 166032da1f5..9af969fe82d 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.storage.internals.log;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.record.internal.ControlRecordType;
 import org.apache.kafka.common.record.internal.EndTransactionMarker;
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
index d8d3a7f5822..dc9c07839c9 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.storage.internals.log;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@@ -1189,10 +1190,10 @@ public class RemoteIndexCacheTest {
         txnIdxFile.createNewFile();
         TransactionIndex txnIndex = new 
TransactionIndex(metadata.startOffset(), txnIdxFile);
         List<AbortedTxn> abortedTxns = List.of(
-                new AbortedTxn(0L, 0, 10, 11),
-                new AbortedTxn(1L, 5, 15, 13),
-                new AbortedTxn(2L, 18, 35, 25),
-                new AbortedTxn(3L, 32, 50, 40));
+                new 
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11),
+                new 
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13),
+                new 
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+                new 
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40));
         for (AbortedTxn abortedTxn : abortedTxns) {
             txnIndex.append(abortedTxn);
         }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
index c25c880cdf0..803122ac456 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import org.apache.kafka.common.message.AbortedTxn;
+import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
@@ -23,7 +25,10 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -45,15 +50,15 @@ public class TransactionIndexTest {
     @Test
     public void testPositionSetCorrectlyWhenOpened() throws IOException {
         List<AbortedTxn> abortedTxns = new ArrayList<>(List.of(
-                new AbortedTxn(0L, 0, 10, 11),
-                new AbortedTxn(1L, 5, 15, 13),
-                new AbortedTxn(2L, 18, 35, 25),
-                new AbortedTxn(3L, 32, 50, 40)));
+                new 
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11),
+                new 
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13),
+                new 
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+                new 
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)));
         abortedTxns.forEach(txn -> assertDoesNotThrow(() -> 
index.append(txn)));
         index.close();
 
         TransactionIndex reopenedIndex = new TransactionIndex(0L, file);
-        AbortedTxn anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55);
+        AbortedTxn anotherAbortedTxn = new 
AbortedTxn().setProducerId(3L).setFirstOffset(50).setLastOffset(60).setLastStableOffset(55);
         reopenedIndex.append(anotherAbortedTxn);
         abortedTxns.add(anotherAbortedTxn);
         assertEquals(abortedTxns, reopenedIndex.allAbortedTxns());
@@ -62,10 +67,10 @@ public class TransactionIndexTest {
     @Test
     public void testSanityCheck() throws IOException {
         List<AbortedTxn> abortedTxns = List.of(
-                new AbortedTxn(0L, 0, 10, 11),
-                new AbortedTxn(1L, 5, 15, 13),
-                new AbortedTxn(2L, 18, 35, 25),
-                new AbortedTxn(3L, 32, 50, 40));
+                new 
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11),
+                new 
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13),
+                new 
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+                new 
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40));
         abortedTxns.forEach(txn -> assertDoesNotThrow(() -> 
index.append(txn)));
         index.close();
 
@@ -77,25 +82,25 @@ public class TransactionIndexTest {
 
     @Test
     public void testLastOffsetMustIncrease() throws IOException {
-        index.append(new AbortedTxn(1L, 5, 15, 13));
-        assertThrows(IllegalArgumentException.class, () -> index.append(new 
AbortedTxn(0L, 0,
-                15, 11)));
+        index.append(new 
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13));
+        assertThrows(IllegalArgumentException.class, () -> index.append(new 
AbortedTxn().setProducerId(0L).setFirstOffset(0)
+                .setLastOffset(15).setLastStableOffset(11)));
     }
 
     @Test
     public void testLastOffsetCannotDecrease() throws IOException {
-        index.append(new AbortedTxn(1L, 5, 15, 13));
-        assertThrows(IllegalArgumentException.class, () -> index.append(new 
AbortedTxn(0L, 0,
-                10, 11)));
+        index.append(new 
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13));
+        assertThrows(IllegalArgumentException.class, () -> index.append(new 
AbortedTxn().setProducerId(0L).setFirstOffset(0)
+                .setLastOffset(10).setLastStableOffset(11)));
     }
 
     @Test
     public void testCollectAbortedTransactions() {
         List<AbortedTxn> abortedTransactions = List.of(
-                new AbortedTxn(0L, 0, 10, 11),
-                new AbortedTxn(1L, 5, 15, 13),
-                new AbortedTxn(2L, 18, 35, 25),
-                new AbortedTxn(3L, 32, 50, 40));
+                new 
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11),
+                new 
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13),
+                new 
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+                new 
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40));
 
         abortedTransactions.forEach(txn -> assertDoesNotThrow(() -> 
index.append(txn)));
 
@@ -127,10 +132,10 @@ public class TransactionIndexTest {
     @Test
     public void testTruncate() throws IOException {
         List<AbortedTxn> abortedTransactions = List.of(
-                new AbortedTxn(0L, 0, 10, 2),
-                new AbortedTxn(1L, 5, 15, 16),
-                new AbortedTxn(2L, 18, 35, 25),
-                new AbortedTxn(3L, 32, 50, 40));
+                new 
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2),
+                new 
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(16),
+                new 
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+                new 
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40));
 
         abortedTransactions.forEach(txn -> assertDoesNotThrow(() -> 
index.append(txn)));
 
@@ -151,8 +156,7 @@ public class TransactionIndexTest {
         long lastOffset = 299L;
         long lastStableOffset = 200L;
 
-        AbortedTxn abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, 
lastStableOffset);
-        assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version());
+        AbortedTxn abortedTxn = new 
AbortedTxn().setProducerId(pid).setFirstOffset(firstOffset).setLastOffset(lastOffset).setLastStableOffset(lastStableOffset);
         assertEquals(pid, abortedTxn.producerId());
         assertEquals(firstOffset, abortedTxn.firstOffset());
         assertEquals(lastOffset, abortedTxn.lastOffset());
@@ -162,10 +166,10 @@ public class TransactionIndexTest {
     @Test
     public void testRenameIndex() throws IOException {
         File renamed = TestUtils.tempFile();
-        index.append(new AbortedTxn(0L, 0, 10, 2));
+        index.append(new 
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2));
 
         index.renameTo(renamed);
-        index.append(new AbortedTxn(1L, 5, 15, 16));
+        index.append(new 
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(16));
 
         List<AbortedTxn> abortedTxns = index.collectAbortedTxns(0L, 
100L).abortedTransactions();
         assertEquals(2, abortedTxns.size());
@@ -188,7 +192,7 @@ public class TransactionIndexTest {
         assertTrue(nonExistentFile.delete());
         try (TransactionIndex testIndex = new TransactionIndex(0, 
nonExistentFile)) {
             testIndex.flush();
-            testIndex.append(new AbortedTxn(0L, 0, 10, 2));
+            testIndex.append(new 
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2));
             testIndex.flush();
             assertNotEquals(0, testIndex.file().length());
         }
@@ -217,7 +221,67 @@ public class TransactionIndexTest {
 
     @Test
     public void testIsEmptyWhenFileIsNotEmpty() throws IOException {
-        index.append(new AbortedTxn(0L, 0, 10, 2));
+        index.append(new 
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2));
         assertFalse(index.isEmpty());
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testIterableReturnsIndependentIterators() throws Exception {
+        List<AbortedTxn> abortedTxns = List.of(
+                new 
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2),
+                new 
AbortedTxn().setProducerId(1L).setFirstOffset(11).setLastOffset(20).setLastStableOffset(15),
+                new 
AbortedTxn().setProducerId(2L).setFirstOffset(21).setLastOffset(30).setLastStableOffset(25));
+        abortedTxns.forEach(txn -> assertDoesNotThrow(() -> 
index.append(txn)));
+
+        Method iterableMethod = 
TransactionIndex.class.getDeclaredMethod("iterable");
+        iterableMethod.setAccessible(true);
+        Iterable<Object> iterable = (Iterable<Object>) 
iterableMethod.invoke(index);
+
+        Iterator<Object> iter1 = iterable.iterator();
+        Iterator<Object> iter2 = iterable.iterator();
+
+        // Exhaust iter1
+        int count1 = 0;
+        while (iter1.hasNext()) {
+            iter1.next();
+            count1++;
+        }
+        assertEquals(3, count1);
+
+        // iter2 must be independent — still readable from the beginning
+        int count2 = 0;
+        while (iter2.hasNext()) {
+            iter2.next();
+            count2++;
+        }
+        assertEquals(3, count2);
+    }
+
+    @Test
+    public void testBinaryCompatibilityWithHandWrittenClass() {
+        long producerId = 983493L;
+        long firstOffset = 137L;
+        long lastOffset = 299L;
+        long lastStableOffset = 200L;
+
+        // Build the expected binary using the same layout as the old 
hand-written AbortedTxn
+        ByteBuffer expected = ByteBuffer.allocate(34);
+        expected.putShort((short) 0); // version
+        expected.putLong(producerId);
+        expected.putLong(firstOffset);
+        expected.putLong(lastOffset);
+        expected.putLong(lastStableOffset);
+        expected.flip();
+
+        // Serialize using the generated class
+        AbortedTxn abortedTxn = new AbortedTxn()
+            .setProducerId(producerId)
+            .setFirstOffset(firstOffset)
+            .setLastOffset(lastOffset)
+            .setLastStableOffset(lastStableOffset);
+        ByteBuffer actual = 
MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION, 
abortedTxn);
+
+        assertEquals(expected, actual);
+    }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index a9aa710c701..f382fe803a3 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.message.DescribeProducersResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
@@ -4147,8 +4148,8 @@ public class UnifiedLogTest {
             abortedTransactions.addAll(segment.txnIndex().allAbortedTxns());
         }
         List<AbortedTxn> expectedTransactions = List.of(
-            new AbortedTxn(pid1, 0L, 29L, 8L),
-            new AbortedTxn(pid2, 8L, 74L, 36L)
+            new 
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
+            new 
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)
         );
         assertEquals(expectedTransactions, abortedTransactions);
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java 
b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
index e0e042ecae8..df4077060f6 100644
--- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
+++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
@@ -18,6 +18,7 @@ package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.message.ConsumerProtocolAssignment;
 import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter;
 import org.apache.kafka.common.message.ConsumerProtocolSubscription;
@@ -60,7 +61,6 @@ import org.apache.kafka.server.util.CommandDefaultOptions;
 import org.apache.kafka.server.util.CommandLineUtils;
 import org.apache.kafka.snapshot.SnapshotPath;
 import org.apache.kafka.snapshot.Snapshots;
-import org.apache.kafka.storage.internals.log.AbortedTxn;
 import org.apache.kafka.storage.internals.log.BatchMetadata;
 import org.apache.kafka.storage.internals.log.CorruptSnapshotException;
 import org.apache.kafka.storage.internals.log.LogFileUtils;
@@ -156,7 +156,7 @@ public class DumpLogSegments {
     private static void dumpTxnIndex(File file) throws IOException {
         try (TransactionIndex index = new 
TransactionIndex(UnifiedLog.offsetFromFile(file), file)) {
             for (AbortedTxn abortedTxn : index.allAbortedTxns()) {
-                System.out.println("version: " + abortedTxn.version() +
+                System.out.println("version: " + 
AbortedTxn.HIGHEST_SUPPORTED_VERSION +
                     " producerId: " + abortedTxn.producerId() +
                     " firstOffset: " + abortedTxn.firstOffset() +
                     " lastOffset: " + abortedTxn.lastOffset() +
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java 
b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
index 5b403d52352..9221f02df5a 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.message.KRaftVersionRecord;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.SnapshotFooterRecord;
@@ -85,7 +86,6 @@ import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.snapshot.RecordsSnapshotWriter;
-import org.apache.kafka.storage.internals.log.AbortedTxn;
 import org.apache.kafka.storage.internals.log.AppendOrigin;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogConfig;
@@ -1486,8 +1486,8 @@ public class DumpLogSegmentsTest {
     public void testDumpTxnIndex() throws Exception {
         File txnIndexFile = new File(logDir, segmentName + ".txnindex");
         try (TransactionIndex index = new TransactionIndex(0L, txnIndexFile)) {
-            index.append(new AbortedTxn(1L, 0, 10, 11));
-            index.append(new AbortedTxn(2L, 15, 25, 26));
+            index.append(new 
AbortedTxn().setProducerId(1L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11));
+            index.append(new 
AbortedTxn().setProducerId(2L).setFirstOffset(15).setLastOffset(25).setLastStableOffset(26));
             index.flush();
         }
 

Reply via email to