This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70dfc4236cd KAFKA-19562 Replace hand-written AbortedTxn with generated
protocol (#21577)
70dfc4236cd is described below
commit 70dfc4236cd865ba100ae56280fd7e60b8cde81f
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Tue Mar 24 01:47:21 2026 +0800
KAFKA-19562 Replace hand-written AbortedTxn with generated protocol (#21577)
Replace the manually maintained AbortedTxn class with a generated
protocol message defined in AbortedTxn.json.
Also simplifies TransactionIndex buffer management since the generated
class copies field values at construction time instead of holding a
buffer reference.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../main/resources/common/message/AbortedTxn.json | 36 ++++++
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 7 +-
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 9 +-
.../test/scala/unit/kafka/log/LogTestUtils.scala | 3 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 7 +-
.../log/remote/storage/RemoteLogManager.java | 6 +-
.../kafka/storage/internals/log/AbortedTxn.java | 117 --------------------
.../internals/log/CleanedTransactionMetadata.java | 1 +
.../kafka/storage/internals/log/Cleaner.java | 1 +
.../kafka/storage/internals/log/LocalLog.java | 5 +-
.../kafka/storage/internals/log/LogSegment.java | 7 +-
.../storage/internals/log/TransactionIndex.java | 51 ++++-----
.../internals/log/TxnIndexSearchResult.java | 2 +
.../kafka/storage/internals/log/UnifiedLog.java | 1 +
.../storage/internals/log/LogSegmentTest.java | 1 +
.../internals/log/RemoteIndexCacheTest.java | 9 +-
.../internals/log/TransactionIndexTest.java | 122 ++++++++++++++++-----
.../storage/internals/log/UnifiedLogTest.java | 5 +-
.../org/apache/kafka/tools/DumpLogSegments.java | 4 +-
.../apache/kafka/tools/DumpLogSegmentsTest.java | 6 +-
20 files changed, 203 insertions(+), 197 deletions(-)
diff --git a/clients/src/main/resources/common/message/AbortedTxn.json
b/clients/src/main/resources/common/message/AbortedTxn.json
new file mode 100644
index 00000000000..bcd84dac953
--- /dev/null
+++ b/clients/src/main/resources/common/message/AbortedTxn.json
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "type": "data",
+ "name": "AbortedTxn",
+ "validVersions": "0",
+ "flexibleVersions": "none",
+ "fields": [
+ { "name": "ProducerId", "type": "int64", "versions": "0+",
+ "about": "The producer id associated with the aborted transaction"
+ },
+ { "name": "FirstOffset", "type": "int64", "versions": "0+",
+ "about": "The first offset in the aborted transaction"
+ },
+ { "name": "LastOffset", "type": "int64", "versions": "0+",
+ "about": "The last offset in the aborted transaction"
+ },
+ { "name": "LastStableOffset", "type": "int64", "versions": "0+",
+ "about": "The last stable offset at the time the transaction was aborted"
+ }
+ // Note: adding new fields may require TransactionIndex to be refactored
to read version-per-record.
+ ]
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 0485f36b0ab..926aa14ddef 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -30,7 +30,8 @@ import
org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog,
LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException,
LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener,
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap,
ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+import org.apache.kafka.common.message.AbortedTxn
+import org.apache.kafka.storage.internals.log.{AppendOrigin,
CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog,
LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException,
LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener,
LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap,
ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
@@ -417,8 +418,8 @@ class LogCleanerTest extends Logging {
assertEquals(20L, log.logEndOffset)
val expectedAbortedTxns = util.List.of(
- new AbortedTxn(producerId1, 8, 10, 11),
- new AbortedTxn(producerId2, 11, 16, 17)
+ new
AbortedTxn().setProducerId(producerId1).setFirstOffset(8).setLastOffset(10).setLastStableOffset(11),
+ new
AbortedTxn().setProducerId(producerId2).setFirstOffset(11).setLastOffset(16).setLastStableOffset(17)
)
assertAllTransactionsComplete(log)
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index c479ff1c174..e65ba72a7a4 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -30,7 +30,8 @@ import org.apache.kafka.metadata.MockConfigRepository
import org.apache.kafka.server.common.TransactionVersion
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig,
EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader,
LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager,
ProducerStateManagerConfig, SnapshotFile, UnifiedLog}
+import org.apache.kafka.common.message.AbortedTxn
+import org.apache.kafka.storage.internals.log.{CleanerConfig, EpochEntry,
LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader,
LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager,
ProducerStateManagerConfig, SnapshotFile, UnifiedLog}
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals,
assertFalse, assertNotEquals, assertThrows, assertTrue}
@@ -1281,7 +1282,7 @@ class LogLoaderTest {
val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 *
5)
val reloadedLog = createLog(logDir, reloadedLogConfig, lastShutdownClean =
false)
val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
- assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2,
8L, 74L, 36L)), abortedTransactions)
+ assertEquals(List(new
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
new
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)),
abortedTransactions)
}
@Test
@@ -1332,7 +1333,7 @@ class LogLoaderTest {
val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 *
5)
val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint =
recoveryPoint, lastShutdownClean = false)
val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
- assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2,
8L, 74L, 36L)), abortedTransactions)
+ assertEquals(List(new
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
new
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)),
abortedTransactions)
}
@Test
@@ -1386,7 +1387,7 @@ class LogLoaderTest {
val reloadedLogConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 *
5)
val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint =
recoveryPoint, lastShutdownClean = false)
val abortedTransactions = LogTestUtils.allAbortedTransactions(reloadedLog)
- assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2,
8L, 74L, 36L)), abortedTransactions)
+ assertEquals(List(new
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
new
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)),
abortedTransactions)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 5b4fc1d41e4..9875a3b2ce4 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -37,7 +37,8 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
import
org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel,
LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager,
ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog}
+import org.apache.kafka.common.message.AbortedTxn
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils,
LogOffsetsListener, LogSegment, ProducerStateManager,
ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import scala.jdk.CollectionConverters._
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 65a41b2579c..18c1f29002f 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -36,7 +36,8 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.{MockTime, Scheduler}
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot,
LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason,
OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+import org.apache.kafka.common.message.AbortedTxn
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener,
LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder,
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -128,8 +129,8 @@ class UnifiedLogTest {
val abortedTransactions = LogTestUtils.allAbortedTransactions(log)
val expectedTransactions = List(
- new AbortedTxn(pid1, 0L, 29L, 8L),
- new AbortedTxn(pid2, 8L, 74L, 36L)
+ new
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
+ new
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)
)
assertEquals(expectedTransactions, abortedTransactions)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 27713827fef..cb2764f142b 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Quota;
@@ -60,7 +61,6 @@ import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
-import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.AsyncOffsetReader;
import org.apache.kafka.storage.internals.log.EpochEntry;
@@ -1893,7 +1893,9 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
Consumer<List<AbortedTxn>> accumulator =
abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
- .map(AbortedTxn::asAbortedTransaction).toList());
+ .map(txn -> new FetchResponseData.AbortedTransaction()
+ .setProducerId(txn.producerId())
+ .setFirstOffset(txn.firstOffset())).toList());
long startTimeNs = time.nanoseconds();
collectAbortedTransactions(startOffset, upperBoundOffset,
segmentMetadata, accumulator, log);
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java
deleted file mode 100644
index 05217d97ba1..00000000000
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbortedTxn.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.storage.internals.log;
-
-import org.apache.kafka.common.message.FetchResponseData;
-
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-public class AbortedTxn {
- static final int VERSION_OFFSET = 0;
- static final int VERSION_SIZE = 2;
- static final int PRODUCER_ID_OFFSET = VERSION_OFFSET + VERSION_SIZE;
- static final int PRODUCER_ID_SIZE = 8;
- static final int FIRST_OFFSET_OFFSET = PRODUCER_ID_OFFSET +
PRODUCER_ID_SIZE;
- static final int FIRST_OFFSET_SIZE = 8;
- static final int LAST_OFFSET_OFFSET = FIRST_OFFSET_OFFSET +
FIRST_OFFSET_SIZE;
- static final int LAST_OFFSET_SIZE = 8;
- static final int LAST_STABLE_OFFSET_OFFSET = LAST_OFFSET_OFFSET +
LAST_OFFSET_SIZE;
- static final int LAST_STABLE_OFFSET_SIZE = 8;
- static final int TOTAL_SIZE = LAST_STABLE_OFFSET_OFFSET +
LAST_STABLE_OFFSET_SIZE;
-
- public static final short CURRENT_VERSION = 0;
-
- final ByteBuffer buffer;
-
- AbortedTxn(ByteBuffer buffer) {
- Objects.requireNonNull(buffer);
- this.buffer = buffer;
- }
-
- public AbortedTxn(CompletedTxn completedTxn, long lastStableOffset) {
- this(completedTxn.producerId(), completedTxn.firstOffset(),
completedTxn.lastOffset(), lastStableOffset);
- }
-
- public AbortedTxn(long producerId, long firstOffset, long lastOffset, long
lastStableOffset) {
- this(toByteBuffer(producerId, firstOffset, lastOffset,
lastStableOffset));
- }
-
- private static ByteBuffer toByteBuffer(long producerId, long firstOffset,
long lastOffset, long lastStableOffset) {
- ByteBuffer buffer = ByteBuffer.allocate(TOTAL_SIZE);
- buffer.putShort(CURRENT_VERSION);
- buffer.putLong(producerId);
- buffer.putLong(firstOffset);
- buffer.putLong(lastOffset);
- buffer.putLong(lastStableOffset);
- buffer.flip();
- return buffer;
- }
-
- public short version() {
- return buffer.get(VERSION_OFFSET);
- }
-
- public long producerId() {
- return buffer.getLong(PRODUCER_ID_OFFSET);
- }
-
- public long firstOffset() {
- return buffer.getLong(FIRST_OFFSET_OFFSET);
- }
-
- public long lastOffset() {
- return buffer.getLong(LAST_OFFSET_OFFSET);
- }
-
- public long lastStableOffset() {
- return buffer.getLong(LAST_STABLE_OFFSET_OFFSET);
- }
-
- public FetchResponseData.AbortedTransaction asAbortedTransaction() {
- return new FetchResponseData.AbortedTransaction()
- .setProducerId(producerId())
- .setFirstOffset(firstOffset());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- AbortedTxn that = (AbortedTxn) o;
- return buffer.equals(that.buffer);
- }
-
- @Override
- public int hashCode() {
- return buffer.hashCode();
- }
-
- @Override
- public String toString() {
- return "AbortedTxn(version=" + version()
- + ", producerId=" + producerId()
- + ", firstOffset=" + firstOffset()
- + ", lastOffset=" + lastOffset()
- + ", lastStableOffset=" + lastStableOffset()
- + ")";
- }
-
-}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java
index 6c4224aa2cd..e076f7d4f3d 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanedTransactionMetadata.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.record.internal.ControlRecordType;
import org.apache.kafka.common.record.internal.Record;
import org.apache.kafka.common.record.internal.RecordBatch;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
index 47d4d8fbb39..de7c8893abd 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java
@@ -19,6 +19,7 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.MutableRecordBatch;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
index b8ecc886ab0..786673db1dd 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.internal.FileLogInputStream;
import org.apache.kafka.common.record.internal.FileRecords;
@@ -540,7 +541,9 @@ public class LocalLog {
List<FetchResponseData.AbortedTransaction> abortedTransactions = new
ArrayList<>();
Consumer<List<AbortedTxn>> accumulator = abortedTxns -> {
for (AbortedTxn abortedTxn : abortedTxns)
- abortedTransactions.add(abortedTxn.asAbortedTransaction());
+ abortedTransactions.add(new
FetchResponseData.AbortedTransaction()
+ .setProducerId(abortedTxn.producerId())
+ .setFirstOffset(abortedTxn.firstOffset()));
};
collectAbortedTransactions(startOffset, upperBoundOffset, segment,
accumulator);
return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 4d9769e3e10..d461a705de5 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -18,6 +18,7 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.message.AbortedTxn;
import
org.apache.kafka.common.record.internal.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.record.internal.FileRecords.LogOffsetPosition;
@@ -348,7 +349,11 @@ public class LogSegment implements Closeable {
public void updateTxnIndex(CompletedTxn completedTxn, long
lastStableOffset) throws IOException {
if (completedTxn.isAborted()) {
LOGGER.trace("Writing aborted transaction {} to transaction index,
last stable offset is {}", completedTxn, lastStableOffset);
- txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset));
+ txnIndex.append(new AbortedTxn()
+ .setProducerId(completedTxn.producerId())
+ .setFirstOffset(completedTxn.firstOffset())
+ .setLastOffset(completedTxn.lastOffset())
+ .setLastStableOffset(lastStableOffset));
}
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
index 6283012c165..f2eb074926f 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
@@ -17,7 +17,9 @@
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.message.AbortedTxn;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.utils.Utils;
import java.io.Closeable;
@@ -32,7 +34,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
-import java.util.function.Supplier;
/**
* The transaction index maintains metadata about the aborted transactions for
each segment. This includes
@@ -47,6 +48,11 @@ import java.util.function.Supplier;
*/
public class TransactionIndex implements Closeable {
+ // Note: if new fields are added to AbortedTxn, this code may need to be
changed to read the
+ // version bytes first for each record and then determine the record body
size based on the version.
+ private static final int ABORTED_TXN_RECORD_SIZE =
+
MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION,
new AbortedTxn()).remaining();
+
private record AbortedTxnWithPosition(AbortedTxn txn, int position) {
}
@@ -82,7 +88,8 @@ public class TransactionIndex implements Closeable {
+ file.getAbsolutePath());
});
lastOffset = OptionalLong.of(abortedTxn.lastOffset());
- Utils.writeFully(channel(), abortedTxn.buffer.duplicate());
+ ByteBuffer buffer =
MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION,
abortedTxn);
+ Utils.writeFully(channel(), buffer);
}
public void flush() throws IOException {
@@ -130,13 +137,11 @@ public class TransactionIndex implements Closeable {
}
public void truncateTo(long offset) throws IOException {
- ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
OptionalLong newLastOffset = OptionalLong.empty();
- for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
+ for (AbortedTxnWithPosition txnWithPosition : iterable()) {
AbortedTxn abortedTxn = txnWithPosition.txn;
- long position = txnWithPosition.position;
if (abortedTxn.lastOffset() >= offset) {
- channel().truncate(position);
+ channel().truncate(txnWithPosition.position);
lastOffset = newLastOffset;
return;
}
@@ -178,8 +183,7 @@ public class TransactionIndex implements Closeable {
* @throws CorruptIndexException if any problems are found.
*/
public void sanityCheck() {
- ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
- for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
+ for (AbortedTxnWithPosition txnWithPosition : iterable()) {
AbortedTxn abortedTxn = txnWithPosition.txn;
if (abortedTxn.lastOffset() < startOffset)
throw new CorruptIndexException("Last offset of aborted
transaction " + abortedTxn + " in index "
@@ -216,22 +220,18 @@ public class TransactionIndex implements Closeable {
}
private Iterable<AbortedTxnWithPosition> iterable() {
- return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE));
- }
-
- private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer>
allocate) {
FileChannel channel = channelOrNull();
if (channel == null)
return List.of();
- PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
-
return () -> new Iterator<>() {
+ private final ByteBuffer buffer =
ByteBuffer.allocate(ABORTED_TXN_RECORD_SIZE);
+ private int position = 0;
@Override
public boolean hasNext() {
try {
- return channel.position() - position.value >=
AbortedTxn.TOTAL_SIZE;
+ return channel.position() - position >=
ABORTED_TXN_RECORD_SIZE;
} catch (IOException e) {
throw new KafkaException("Failed read position from the
transaction index " + file.getAbsolutePath(), e);
}
@@ -240,17 +240,18 @@ public class TransactionIndex implements Closeable {
@Override
public AbortedTxnWithPosition next() {
try {
- ByteBuffer buffer = allocate.get();
- Utils.readFully(channel, buffer, position.value);
+ buffer.clear();
+ Utils.readFully(channel, buffer, position);
buffer.flip();
- AbortedTxn abortedTxn = new AbortedTxn(buffer);
- if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION)
- throw new KafkaException("Unexpected aborted
transaction version " + abortedTxn.version()
- + " in transaction index " +
file.getAbsolutePath() + ", current version is "
- + AbortedTxn.CURRENT_VERSION);
- AbortedTxnWithPosition nextEntry = new
AbortedTxnWithPosition(abortedTxn, position.value);
- position.value += AbortedTxn.TOTAL_SIZE;
+ short version = buffer.getShort();
+ if (version < AbortedTxn.LOWEST_SUPPORTED_VERSION ||
version > AbortedTxn.HIGHEST_SUPPORTED_VERSION)
+ throw new KafkaException("Unexpected aborted
transaction version " + version
+ + " in transaction index " +
file.getAbsolutePath() + ", supported version range is "
+ + AbortedTxn.LOWEST_SUPPORTED_VERSION + " to " +
AbortedTxn.HIGHEST_SUPPORTED_VERSION);
+ AbortedTxn abortedTxn = new AbortedTxn(new
ByteBufferAccessor(buffer), version);
+ AbortedTxnWithPosition nextEntry = new
AbortedTxnWithPosition(abortedTxn, position);
+ position += ABORTED_TXN_RECORD_SIZE;
return nextEntry;
} catch (IOException e) {
// We received an unexpected error reading from the index
file. We propagate this as an
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java
index 6f9a425ea1a..ef1b14fff6f 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TxnIndexSearchResult.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.message.AbortedTxn;
+
import java.util.Collections;
import java.util.List;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index e523a3218c3..65f5042c41a 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.internal.CompressionType;
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index 166032da1f5..9af969fe82d 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.internal.ControlRecordType;
import org.apache.kafka.common.record.internal.EndTransactionMarker;
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
index d8d3a7f5822..dc9c07839c9 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@@ -1189,10 +1190,10 @@ public class RemoteIndexCacheTest {
txnIdxFile.createNewFile();
TransactionIndex txnIndex = new
TransactionIndex(metadata.startOffset(), txnIdxFile);
List<AbortedTxn> abortedTxns = List.of(
- new AbortedTxn(0L, 0, 10, 11),
- new AbortedTxn(1L, 5, 15, 13),
- new AbortedTxn(2L, 18, 35, 25),
- new AbortedTxn(3L, 32, 50, 40));
+ new
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11),
+ new
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13),
+ new
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+ new
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40));
for (AbortedTxn abortedTxn : abortedTxns) {
txnIndex.append(abortedTxn);
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
index c25c880cdf0..803122ac456 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.message.AbortedTxn;
+import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -23,7 +25,10 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -45,15 +50,15 @@ public class TransactionIndexTest {
@Test
public void testPositionSetCorrectlyWhenOpened() throws IOException {
List<AbortedTxn> abortedTxns = new ArrayList<>(List.of(
- new AbortedTxn(0L, 0, 10, 11),
- new AbortedTxn(1L, 5, 15, 13),
- new AbortedTxn(2L, 18, 35, 25),
- new AbortedTxn(3L, 32, 50, 40)));
+ new
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11),
+ new
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13),
+ new
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+ new
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40)));
abortedTxns.forEach(txn -> assertDoesNotThrow(() ->
index.append(txn)));
index.close();
TransactionIndex reopenedIndex = new TransactionIndex(0L, file);
- AbortedTxn anotherAbortedTxn = new AbortedTxn(3L, 50, 60, 55);
+ AbortedTxn anotherAbortedTxn = new
AbortedTxn().setProducerId(3L).setFirstOffset(50).setLastOffset(60).setLastStableOffset(55);
reopenedIndex.append(anotherAbortedTxn);
abortedTxns.add(anotherAbortedTxn);
assertEquals(abortedTxns, reopenedIndex.allAbortedTxns());
@@ -62,10 +67,10 @@ public class TransactionIndexTest {
@Test
public void testSanityCheck() throws IOException {
List<AbortedTxn> abortedTxns = List.of(
- new AbortedTxn(0L, 0, 10, 11),
- new AbortedTxn(1L, 5, 15, 13),
- new AbortedTxn(2L, 18, 35, 25),
- new AbortedTxn(3L, 32, 50, 40));
+ new
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11),
+ new
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13),
+ new
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+ new
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40));
abortedTxns.forEach(txn -> assertDoesNotThrow(() ->
index.append(txn)));
index.close();
@@ -77,25 +82,25 @@ public class TransactionIndexTest {
@Test
public void testLastOffsetMustIncrease() throws IOException {
- index.append(new AbortedTxn(1L, 5, 15, 13));
- assertThrows(IllegalArgumentException.class, () -> index.append(new
AbortedTxn(0L, 0,
- 15, 11)));
+ index.append(new
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13));
+ assertThrows(IllegalArgumentException.class, () -> index.append(new
AbortedTxn().setProducerId(0L).setFirstOffset(0)
+ .setLastOffset(15).setLastStableOffset(11)));
}
@Test
public void testLastOffsetCannotDecrease() throws IOException {
- index.append(new AbortedTxn(1L, 5, 15, 13));
- assertThrows(IllegalArgumentException.class, () -> index.append(new
AbortedTxn(0L, 0,
- 10, 11)));
+ index.append(new
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13));
+ assertThrows(IllegalArgumentException.class, () -> index.append(new
AbortedTxn().setProducerId(0L).setFirstOffset(0)
+ .setLastOffset(10).setLastStableOffset(11)));
}
@Test
public void testCollectAbortedTransactions() {
List<AbortedTxn> abortedTransactions = List.of(
- new AbortedTxn(0L, 0, 10, 11),
- new AbortedTxn(1L, 5, 15, 13),
- new AbortedTxn(2L, 18, 35, 25),
- new AbortedTxn(3L, 32, 50, 40));
+ new
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11),
+ new
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(13),
+ new
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+ new
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40));
abortedTransactions.forEach(txn -> assertDoesNotThrow(() ->
index.append(txn)));
@@ -127,10 +132,10 @@ public class TransactionIndexTest {
@Test
public void testTruncate() throws IOException {
List<AbortedTxn> abortedTransactions = List.of(
- new AbortedTxn(0L, 0, 10, 2),
- new AbortedTxn(1L, 5, 15, 16),
- new AbortedTxn(2L, 18, 35, 25),
- new AbortedTxn(3L, 32, 50, 40));
+ new
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2),
+ new
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(16),
+ new
AbortedTxn().setProducerId(2L).setFirstOffset(18).setLastOffset(35).setLastStableOffset(25),
+ new
AbortedTxn().setProducerId(3L).setFirstOffset(32).setLastOffset(50).setLastStableOffset(40));
abortedTransactions.forEach(txn -> assertDoesNotThrow(() ->
index.append(txn)));
@@ -151,8 +156,7 @@ public class TransactionIndexTest {
long lastOffset = 299L;
long lastStableOffset = 200L;
- AbortedTxn abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset,
lastStableOffset);
- assertEquals(AbortedTxn.CURRENT_VERSION, abortedTxn.version());
+ AbortedTxn abortedTxn = new
AbortedTxn().setProducerId(pid).setFirstOffset(firstOffset).setLastOffset(lastOffset).setLastStableOffset(lastStableOffset);
assertEquals(pid, abortedTxn.producerId());
assertEquals(firstOffset, abortedTxn.firstOffset());
assertEquals(lastOffset, abortedTxn.lastOffset());
@@ -162,10 +166,10 @@ public class TransactionIndexTest {
@Test
public void testRenameIndex() throws IOException {
File renamed = TestUtils.tempFile();
- index.append(new AbortedTxn(0L, 0, 10, 2));
+ index.append(new
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2));
index.renameTo(renamed);
- index.append(new AbortedTxn(1L, 5, 15, 16));
+ index.append(new
AbortedTxn().setProducerId(1L).setFirstOffset(5).setLastOffset(15).setLastStableOffset(16));
List<AbortedTxn> abortedTxns = index.collectAbortedTxns(0L,
100L).abortedTransactions();
assertEquals(2, abortedTxns.size());
@@ -188,7 +192,7 @@ public class TransactionIndexTest {
assertTrue(nonExistentFile.delete());
try (TransactionIndex testIndex = new TransactionIndex(0,
nonExistentFile)) {
testIndex.flush();
- testIndex.append(new AbortedTxn(0L, 0, 10, 2));
+ testIndex.append(new
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2));
testIndex.flush();
assertNotEquals(0, testIndex.file().length());
}
@@ -217,7 +221,67 @@ public class TransactionIndexTest {
@Test
public void testIsEmptyWhenFileIsNotEmpty() throws IOException {
- index.append(new AbortedTxn(0L, 0, 10, 2));
+ index.append(new
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2));
assertFalse(index.isEmpty());
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testIterableReturnsIndependentIterators() throws Exception {
+ List<AbortedTxn> abortedTxns = List.of(
+ new
AbortedTxn().setProducerId(0L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(2),
+ new
AbortedTxn().setProducerId(1L).setFirstOffset(11).setLastOffset(20).setLastStableOffset(15),
+ new
AbortedTxn().setProducerId(2L).setFirstOffset(21).setLastOffset(30).setLastStableOffset(25));
+ abortedTxns.forEach(txn -> assertDoesNotThrow(() ->
index.append(txn)));
+
+ Method iterableMethod =
TransactionIndex.class.getDeclaredMethod("iterable");
+ iterableMethod.setAccessible(true);
+ Iterable<Object> iterable = (Iterable<Object>)
iterableMethod.invoke(index);
+
+ Iterator<Object> iter1 = iterable.iterator();
+ Iterator<Object> iter2 = iterable.iterator();
+
+ // Exhaust iter1
+ int count1 = 0;
+ while (iter1.hasNext()) {
+ iter1.next();
+ count1++;
+ }
+ assertEquals(3, count1);
+
+ // iter2 must be independent — still readable from the beginning
+ int count2 = 0;
+ while (iter2.hasNext()) {
+ iter2.next();
+ count2++;
+ }
+ assertEquals(3, count2);
+ }
+
+ @Test
+ public void testBinaryCompatibilityWithHandWrittenClass() {
+ long producerId = 983493L;
+ long firstOffset = 137L;
+ long lastOffset = 299L;
+ long lastStableOffset = 200L;
+
+ // Build the expected binary using the same layout as the old
hand-written AbortedTxn
+ ByteBuffer expected = ByteBuffer.allocate(34);
+ expected.putShort((short) 0); // version
+ expected.putLong(producerId);
+ expected.putLong(firstOffset);
+ expected.putLong(lastOffset);
+ expected.putLong(lastStableOffset);
+ expected.flip();
+
+ // Serialize using the generated class
+ AbortedTxn abortedTxn = new AbortedTxn()
+ .setProducerId(producerId)
+ .setFirstOffset(firstOffset)
+ .setLastOffset(lastOffset)
+ .setLastStableOffset(lastStableOffset);
+ ByteBuffer actual =
MessageUtil.toVersionPrefixedByteBuffer(AbortedTxn.HIGHEST_SUPPORTED_VERSION,
abortedTxn);
+
+ assertEquals(expected, actual);
+ }
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index a9aa710c701..f382fe803a3 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
@@ -4147,8 +4148,8 @@ public class UnifiedLogTest {
abortedTransactions.addAll(segment.txnIndex().allAbortedTxns());
}
List<AbortedTxn> expectedTransactions = List.of(
- new AbortedTxn(pid1, 0L, 29L, 8L),
- new AbortedTxn(pid2, 8L, 74L, 36L)
+ new
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
+ new
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)
);
assertEquals(expectedTransactions, abortedTransactions);
diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
index e0e042ecae8..df4077060f6 100644
--- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
+++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
@@ -18,6 +18,7 @@ package org.apache.kafka.tools;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.message.ConsumerProtocolAssignment;
import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
@@ -60,7 +61,6 @@ import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.snapshot.SnapshotPath;
import org.apache.kafka.snapshot.Snapshots;
-import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.BatchMetadata;
import org.apache.kafka.storage.internals.log.CorruptSnapshotException;
import org.apache.kafka.storage.internals.log.LogFileUtils;
@@ -156,7 +156,7 @@ public class DumpLogSegments {
private static void dumpTxnIndex(File file) throws IOException {
try (TransactionIndex index = new
TransactionIndex(UnifiedLog.offsetFromFile(file), file)) {
for (AbortedTxn abortedTxn : index.allAbortedTxns()) {
- System.out.println("version: " + abortedTxn.version() +
+ System.out.println("version: " +
AbortedTxn.HIGHEST_SUPPORTED_VERSION +
" producerId: " + abortedTxn.producerId() +
" firstOffset: " + abortedTxn.firstOffset() +
" lastOffset: " + abortedTxn.lastOffset() +
diff --git
a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
index 5b403d52352..9221f02df5a 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
@@ -85,7 +86,6 @@ import
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
-import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.AppendOrigin;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
@@ -1486,8 +1486,8 @@ public class DumpLogSegmentsTest {
public void testDumpTxnIndex() throws Exception {
File txnIndexFile = new File(logDir, segmentName + ".txnindex");
try (TransactionIndex index = new TransactionIndex(0L, txnIndexFile)) {
- index.append(new AbortedTxn(1L, 0, 10, 11));
- index.append(new AbortedTxn(2L, 15, 25, 26));
+ index.append(new
AbortedTxn().setProducerId(1L).setFirstOffset(0).setLastOffset(10).setLastStableOffset(11));
+ index.append(new
AbortedTxn().setProducerId(2L).setFirstOffset(15).setLastOffset(25).setLastStableOffset(26));
index.flush();
}