This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c42359579b KAFKA-19752: Move UnifiedLogTest to storage module (#21844)
3c42359579b is described below

commit 3c42359579bba930d254a69812ac6b04b179628d
Author: TaiJuWu <[email protected]>
AuthorDate: Thu Mar 26 21:29:44 2026 +0800

    KAFKA-19752: Move UnifiedLogTest to storage module (#21844)
    
    testTransactionIndexUpdatedThroughReplication ~
    testRenamingDirWithoutReinitialization
    
    Reviewers: Christo Lolov <[email protected]>
---
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 723 +--------------------
 .../kafka/storage/internals/log/LogTestUtils.java  |   9 +
 .../storage/internals/log/UnifiedLogTest.java      | 667 +++++++++++++++++++
 3 files changed, 678 insertions(+), 721 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 18c1f29002f..12be8551252 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -24,22 +24,18 @@ import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.record.internal._
-import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.{MockTime, Scheduler}
 
-import org.apache.kafka.common.message.AbortedTxn
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, 
LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, 
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, OffsetResultHolder, ProducerStateManagerConfig, 
UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
+import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
@@ -47,7 +43,6 @@ import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{doThrow, spy}
 
 import java.io._
-import java.nio.ByteBuffer
 import java.nio.file.Files
 import java.util
 import java.util.concurrent.ConcurrentHashMap
@@ -85,679 +80,6 @@ class UnifiedLogTest {
     }
   }
 
-
-  @Test
-  def testTransactionIndexUpdatedThroughReplication(): Unit = {
-    val epoch = 0.toShort
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    val buffer = ByteBuffer.allocate(2048)
-
-    val pid1 = 1L
-    val pid2 = 2L
-    val pid3 = 3L
-    val pid4 = 4L
-
-    val appendPid1 = appendTransactionalToBuffer(buffer, pid1, epoch)
-    val appendPid2 = appendTransactionalToBuffer(buffer, pid2, epoch)
-    val appendPid3 = appendTransactionalToBuffer(buffer, pid3, epoch)
-    val appendPid4 = appendTransactionalToBuffer(buffer, pid4, epoch)
-
-    appendPid1(0L, 5)
-    appendNonTransactionalToBuffer(buffer, 5L, 3)
-    appendPid2(8L, 2)
-    appendPid1(10L, 4)
-    appendPid3(14L, 3)
-    appendNonTransactionalToBuffer(buffer, 17L, 2)
-    appendPid1(19L, 10)
-    appendEndTxnMarkerToBuffer(buffer, pid1, epoch, 29L, 
ControlRecordType.ABORT)
-    appendPid2(30L, 6)
-    appendPid4(36L, 3)
-    appendNonTransactionalToBuffer(buffer, 39L, 10)
-    appendPid3(49L, 9)
-    appendEndTxnMarkerToBuffer(buffer, pid3, epoch, 58L, 
ControlRecordType.COMMIT)
-    appendPid4(59L, 8)
-    appendPid2(67L, 7)
-    appendEndTxnMarkerToBuffer(buffer, pid2, epoch, 74L, 
ControlRecordType.ABORT)
-    appendNonTransactionalToBuffer(buffer, 75L, 10)
-    appendPid4(85L, 4)
-    appendEndTxnMarkerToBuffer(buffer, pid4, epoch, 89L, 
ControlRecordType.COMMIT)
-
-    buffer.flip()
-
-    appendAsFollower(log, MemoryRecords.readableRecords(buffer), epoch)
-
-    val abortedTransactions = LogTestUtils.allAbortedTransactions(log)
-    val expectedTransactions = List(
-      new 
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
-      new 
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)
-    )
-
-    assertEquals(expectedTransactions, abortedTransactions)
-
-    // Verify caching of the segment position of the first unstable offset
-    log.updateHighWatermark(30L)
-    assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
-
-    log.updateHighWatermark(75L)
-    assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
-
-    log.updateHighWatermark(log.logEndOffset)
-    assertEquals(Optional.empty, log.firstUnstableOffset)
-  }
-
-  private def assertCachedFirstUnstableOffset(log: UnifiedLog, expectedOffset: 
Long): Unit = {
-    assertTrue(log.producerStateManager.firstUnstableOffset.isPresent)
-    val firstUnstableOffset = log.producerStateManager.firstUnstableOffset.get
-    assertEquals(expectedOffset, firstUnstableOffset.messageOffset)
-    assertFalse(firstUnstableOffset.messageOffsetOnly)
-    assertValidLogOffsetMetadata(log, firstUnstableOffset)
-  }
-
-  private def assertValidLogOffsetMetadata(log: UnifiedLog, offsetMetadata: 
LogOffsetMetadata): Unit = {
-    assertFalse(offsetMetadata.messageOffsetOnly)
-
-    val segmentBaseOffset = offsetMetadata.segmentBaseOffset
-    val segments = log.logSegments(segmentBaseOffset, segmentBaseOffset + 1)
-    assertFalse(segments.isEmpty)
-
-    val segment = segments.iterator().next()
-    assertEquals(segmentBaseOffset, segment.baseOffset)
-    assertTrue(offsetMetadata.relativePositionInSegment <= segment.size)
-
-    val readInfo = segment.read(offsetMetadata.messageOffset,
-      2048,
-      Optional.of(segment.size),
-      false)
-
-    if (offsetMetadata.relativePositionInSegment < segment.size)
-      assertEquals(offsetMetadata, readInfo.fetchOffsetMetadata)
-    else
-      assertNull(readInfo)
-  }
-
-  @Test
-  def testZombieCoordinatorFenced(): Unit = {
-    val pid = 1L
-    val epoch = 0.toShort
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-
-    val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, 
mockTime)
-
-    append(10)
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
-      coordinatorEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-
-    append(5)
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.COMMIT, mockTime.milliseconds(),
-      coordinatorEpoch = 2, transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-
-    assertThrows(
-      classOf[TransactionCoordinatorFencedException],
-      () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
-        coordinatorEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel()))
-  }
-
-  @Test
-  def testZombieCoordinatorFencedEmptyTransaction(): Unit = {
-    val pid = 1L
-    val epoch = 0.toShort
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-
-    val buffer = ByteBuffer.allocate(256)
-    val append = appendTransactionalToBuffer(buffer, pid, epoch, 1)
-    append(0, 10)
-    appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L, 
ControlRecordType.COMMIT, 1)
-
-    buffer.flip()
-    log.appendAsFollower(MemoryRecords.readableRecords(buffer), epoch)
-
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
-      coordinatorEpoch = 2, leaderEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
-      coordinatorEpoch = 2, leaderEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-    assertThrows(classOf[TransactionCoordinatorFencedException],
-      () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
-        coordinatorEpoch = 1, leaderEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel()))
-  }
-
-  @ParameterizedTest(name = "testEndTxnWithFencedProducerEpoch with 
transactionVersion={0}")
-  @ValueSource(shorts = Array(1, 2))
-  def testEndTxnWithFencedProducerEpoch(transactionVersion: Short): Unit = {
-    val producerId = 1L
-    val epoch = 5.toShort
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    
-    // First, write some transactional records to establish the current epoch
-    val records = MemoryRecords.withTransactionalRecords(
-      Compression.NONE, producerId, epoch, 0,
-      new SimpleRecord("key".getBytes, "value".getBytes)
-    )
-    log.appendAsLeader(records, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion)
-    
-    // Test 1: Old epoch (epoch - 1) should be rejected for both TV0/TV1 and 
TV2
-    // TV0/TV1: markerEpoch < currentEpoch is rejected
-    // TV2: markerEpoch <= currentEpoch is rejected (requires strict >)
-    assertThrows(classOf[InvalidProducerEpochException],
-      () => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, (epoch - 
1).toShort, 
-        ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 
1, 
-        leaderEpoch = 0, transactionVersion = transactionVersion))
-    
-    // Test 2: Same epoch behavior differs between TV0/TV1 and TV2
-    // TV0/TV1: same epoch is allowed (markerEpoch >= currentEpoch)
-    // TV2: same epoch is rejected (requires strict >, markerEpoch > 
currentEpoch)
-    if (transactionVersion >= 2) {
-      // TV2: same epoch should be rejected
-      assertThrows(classOf[InvalidProducerEpochException],
-        () => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch, 
-          ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 
1, 
-          leaderEpoch = 0, transactionVersion = transactionVersion))
-    } else {
-      // TV0/TV1: same epoch should be allowed
-      assertDoesNotThrow(() => LogTestUtils.appendEndTxnMarkerAsLeader(log, 
producerId, epoch, 
-        ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 
1, 
-        leaderEpoch = 0, transactionVersion = transactionVersion))
-    }
-  }
-
-  @Test
-  def testTV2MarkerWithBumpedEpochSucceeds(): Unit = {
-    // Test that TV2 markers with bumped epochs (epoch + 1) are accepted 
(positive case)
-    // TV2 (KIP-890): Coordinator bumps epoch before writing marker, so 
markerEpoch = currentEpoch + 1
-    val transactionVersion: Short = 2
-    val producerId = 1L
-    val epoch = 5.toShort
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    
-    // First, write some transactional records to establish the current epoch
-    val records = MemoryRecords.withTransactionalRecords(
-      Compression.NONE, producerId, epoch, 0,
-      new SimpleRecord("key".getBytes, "value".getBytes)
-    )
-    log.appendAsLeader(records, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion)
-    
-    // TV2: Verify that bumped epoch (epoch + 1) is accepted
-    val bumpedEpoch = (epoch + 1).toShort
-    assertDoesNotThrow(() => LogTestUtils.appendEndTxnMarkerAsLeader(log, 
producerId, bumpedEpoch,
-      ControlRecordType.COMMIT, mockTime.milliseconds(), coordinatorEpoch = 1,
-      leaderEpoch = 0, transactionVersion = 
TransactionVersion.TV_2.featureLevel()))
-    
-    // Verify the marker was successfully appended by checking producer state
-    val producerState = 
log.producerStateManager.activeProducers.get(producerId)
-    assertNotNull(producerState)
-    // After a commit marker, the producer epoch should be updated to the 
bumped epoch for TV2
-    assertEquals(bumpedEpoch, producerState.producerEpoch)
-  }
-
-  @Test
-  def testReplicationWithTVUnknownAllowed(): Unit = {
-    // Test that TV_UNKNOWN is allowed for replication (REPLICATION origin) 
and uses TV_0 validation
-    // This simulates the scenario where:
-    // 1. Leader receives WriteTxnMarkersRequest with transactionVersion=2 and 
validates with strict TV2 rules
-    // 2. Leader writes MemoryRecords to log (transactionVersion is not stored 
in MemoryRecords)
-    // 3. Follower receives MemoryRecords via replication (without 
transactionVersion metadata)
-    // 4. Follower uses TV_UNKNOWN which defaults to TV_0 validation (more 
permissive, safe because leader already validated)
-    
-    val producerId = 1L
-    val epoch = 5.toShort
-    val coordinatorEpoch = 1
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    
-    // Step 1: Write transactional records as leader to establish current epoch
-    val transactionalRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE, producerId, epoch, 0,
-      new SimpleRecord("key".getBytes, "value".getBytes)
-    )
-    log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_2.featureLevel())
-    
-    // Step 2: Simulate leader writing TV2 marker with bumped epoch (epoch + 1)
-    // This is what happens at the leader when WriteTxnMarkersRequest is 
received
-    val bumpedEpoch = (epoch + 1).toShort
-    val leaderMarker = MemoryRecords.withEndTransactionMarker(
-      mockTime.milliseconds(),
-      producerId,
-      bumpedEpoch,
-      new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
-    )
-    // Leader validates with TV2 (strict: markerEpoch > currentEpoch)
-    log.appendAsLeader(leaderMarker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_2.featureLevel())
-    
-    // Verify leader state
-    val leaderProducerState = 
log.producerStateManager.activeProducers.get(producerId)
-    assertNotNull(leaderProducerState)
-    assertEquals(bumpedEpoch, leaderProducerState.producerEpoch)
-    
-    // Step 3: Create a new log to simulate a follower
-    val followerLogDir = TestUtils.randomPartitionLogDir(tmpDir)
-    val followerLog = createLog(followerLogDir, logConfig)
-    
-    // Step 4: Follower replicates transactional records first
-    val followerTransactionalRecords = MemoryRecords.withTransactionalRecords(
-      0L,
-      Compression.NONE, producerId, epoch, 0,
-      0,
-      new SimpleRecord("key".getBytes, "value".getBytes)
-    )
-    followerLog.appendAsFollower(followerTransactionalRecords, 0)
-    
-    // Step 5: Follower replicates the marker (appendAsFollower uses 
TV_UNKNOWN internally)
-    // This should succeed because TV_UNKNOWN is allowed for REPLICATION origin
-    // and defaults to TV_0 validation (markerEpoch >= currentEpoch), which is 
more permissive
-    // The marker should be at offset 1 (after the transactional record at 
offset 0)
-    val followerMarker = MemoryRecords.withEndTransactionMarker(
-      1L, // offset after the transactional record
-      mockTime.milliseconds(),
-      0, // partition leader epoch
-      producerId,
-      bumpedEpoch,
-      new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
-    )
-    
-    // This should not throw an exception - TV_UNKNOWN is allowed for 
replication
-    assertDoesNotThrow(() => followerLog.appendAsFollower(followerMarker, 0))
-    
-    // Verify follower state matches leader state
-    val followerProducerState = 
followerLog.producerStateManager.activeProducers.get(producerId)
-    assertNotNull(followerProducerState)
-    assertEquals(bumpedEpoch, followerProducerState.producerEpoch)
-    assertEquals(coordinatorEpoch, followerProducerState.coordinatorEpoch)
-    
-    // Verify the marker was written to the follower log
-    assertEquals(2L, followerLog.logEndOffset) // 1 transactional record + 1 
marker
-  }
-
-  @Test
-  def testLeaderRejectsTVUnknownForTransactionMarker(): Unit = {
-    // Test that TV_UNKNOWN is rejected for COORDINATOR origin (leader writing 
transaction markers)
-    // TV_UNKNOWN is only allowed for REPLICATION origin (followers)
-    val producerId = 1L
-    val epoch = 5.toShort
-    val coordinatorEpoch = 1
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    
-    // Write transactional records as leader to establish current epoch
-    val transactionalRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE, producerId, epoch, 0,
-      new SimpleRecord("key".getBytes, "value".getBytes)
-    )
-    log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_2.featureLevel())
-    
-    // Attempt to write a transaction marker with TV_UNKNOWN as COORDINATOR 
(leader)
-    // This should throw IllegalArgumentException because TV_UNKNOWN is not 
allowed for COORDINATOR origin
-    val marker = MemoryRecords.withEndTransactionMarker(
-      mockTime.milliseconds(),
-      producerId,
-      (epoch + 1).toShort, // bumped epoch for TV2
-      new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
-    )
-    
-    val exception = assertThrows(classOf[IllegalArgumentException], () => {
-      log.appendAsLeader(marker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_UNKNOWN)
-    })
-    
-    assertTrue(exception.getMessage.contains("transactionVersion must be 
explicitly specified"))
-    assertTrue(exception.getMessage.contains("TV_UNKNOWN"))
-    assertTrue(exception.getMessage.contains("COORDINATOR"))
-  }
-
-  @Test
-  def testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    val epoch = 0.toShort
-    val pid = 1L
-    val appendPid = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, 
mockTime)
-
-    appendPid(5)
-    LogTestUtils.appendNonTransactionalAsLeader(log, 3)
-    assertEquals(8L, log.logEndOffset)
-
-    log.roll()
-    assertEquals(2, log.logSegments.size)
-    appendPid(5)
-
-    assertEquals(Optional.of(0L), log.firstUnstableOffset)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.maybeIncrementLogStartOffset(5L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
-    // the first unstable offset should be lower bounded by the log start 
offset
-    assertEquals(Optional.of(5L), log.firstUnstableOffset)
-  }
-
-  @Test
-  def testLastStableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): 
Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    val epoch = 0.toShort
-    val pid = 1L
-    val appendPid = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, 
mockTime)
-
-    appendPid(5)
-    LogTestUtils.appendNonTransactionalAsLeader(log, 3)
-    assertEquals(8L, log.logEndOffset)
-
-    log.roll()
-    assertEquals(2, log.logSegments.size)
-    appendPid(5)
-
-    assertEquals(Optional.of(0L), log.firstUnstableOffset)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.maybeIncrementLogStartOffset(8L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    log.updateHighWatermark(log.logEndOffset)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertEquals(1, log.logSegments.size)
-
-    // the first unstable offset should be lower bounded by the log start 
offset
-    assertEquals(Optional.of(8L), log.firstUnstableOffset)
-  }
-
-  @Test
-  def testAppendToTransactionIndexFailure(): Unit = {
-    val pid = 1L
-    val epoch = 0.toShort
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-
-    val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, 
mockTime)
-    append(10)
-
-    // Kind of a hack, but renaming the index to a directory ensures that the 
append
-    // to the index will fail.
-    log.activeSegment.txnIndex.renameTo(log.dir)
-
-    // The append will be written to the log successfully, but the write to 
the index will fail
-    assertThrows(
-      classOf[KafkaStorageException],
-      () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
-        coordinatorEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel()))
-    assertEquals(11L, log.logEndOffset)
-    assertEquals(0L, log.lastStableOffset)
-
-    // Try the append a second time. The appended offset in the log should not 
increase
-    // because the log dir is marked as failed.  Nor will there be a write to 
the transaction
-    // index.
-    assertThrows(
-      classOf[KafkaStorageException],
-      () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
-        coordinatorEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel()))
-    assertEquals(11L, log.logEndOffset)
-    assertEquals(0L, log.lastStableOffset)
-
-    // Even if the high watermark is updated, the first unstable offset does 
not move
-    log.updateHighWatermark(12L)
-    assertEquals(0L, log.lastStableOffset)
-
-    assertThrows(classOf[KafkaStorageException], () => log.close())
-    val reopenedLog = createLog(logDir, logConfig, lastShutdownClean = false)
-    assertEquals(11L, reopenedLog.logEndOffset)
-    assertEquals(1, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
-    reopenedLog.updateHighWatermark(12L)
-    assertEquals(Optional.empty, reopenedLog.firstUnstableOffset)
-  }
-
-  @Test
-  def testOffsetSnapshot(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-
-    // append a few records
-    appendAsFollower(
-      log,
-      MemoryRecords.withRecords(
-        Compression.NONE,
-        new SimpleRecord("a".getBytes),
-        new SimpleRecord("b".getBytes),
-        new SimpleRecord("c".getBytes)
-      ),
-      5
-    )
-
-
-    log.updateHighWatermark(3L)
-    var offsets: LogOffsetSnapshot = log.fetchOffsetSnapshot
-    assertEquals(offsets.highWatermark.messageOffset, 3L)
-    assertFalse(offsets.highWatermark.messageOffsetOnly)
-
-    offsets = log.fetchOffsetSnapshot
-    assertEquals(offsets.highWatermark.messageOffset, 3L)
-    assertFalse(offsets.highWatermark.messageOffsetOnly)
-  }
-
-  @Test
-  def testLastStableOffsetWithMixedProducerData(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-
-    // for convenience, both producers share the same epoch
-    val epoch = 5.toShort
-
-    val pid1 = 137L
-    val seq1 = 0
-    val pid2 = 983L
-    val seq2 = 0
-
-    // add some transactional records
-    val firstAppendInfo = 
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid1, epoch, seq1,
-      new SimpleRecord("a".getBytes),
-      new SimpleRecord("b".getBytes),
-      new SimpleRecord("c".getBytes)), 0)
-    assertEquals(Optional.of(firstAppendInfo.firstOffset), 
log.firstUnstableOffset)
-
-    // mix in some non-transactional data
-    log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
-      new SimpleRecord("g".getBytes),
-      new SimpleRecord("h".getBytes),
-      new SimpleRecord("i".getBytes)), 0)
-
-    // append data from a second transactional producer
-    val secondAppendInfo = 
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid2, epoch, seq2,
-      new SimpleRecord("d".getBytes),
-      new SimpleRecord("e".getBytes),
-      new SimpleRecord("f".getBytes)), 0)
-
-    // LSO should not have changed
-    assertEquals(Optional.of(firstAppendInfo.firstOffset), 
log.firstUnstableOffset)
-
-    // now first producer's transaction is aborted
-    val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, 
epoch, ControlRecordType.ABORT,
-      mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-    log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
-
-    // LSO should now point to one less than the first offset of the second 
transaction
-    assertEquals(Optional.of(secondAppendInfo.firstOffset), 
log.firstUnstableOffset)
-
-    // commit the second transaction
-    val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, 
epoch, ControlRecordType.COMMIT,
-      mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-    log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
-
-    // now there should be no first unstable offset
-    assertEquals(Optional.empty, log.firstUnstableOffset)
-  }
-
-  @Test
-  def testAbortedTransactionSpanningMultipleSegments(): Unit = {
-    val pid = 137L
-    val epoch = 5.toShort
-    var seq = 0
-
-    val records = MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid, epoch, seq,
-      new SimpleRecord("a".getBytes),
-      new SimpleRecord("b".getBytes),
-      new SimpleRecord("c".getBytes))
-
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
records.sizeInBytes)
-    val log = createLog(logDir, logConfig)
-
-    val firstAppendInfo = log.appendAsLeader(records, 0)
-    assertEquals(Optional.of(firstAppendInfo.firstOffset), 
log.firstUnstableOffset)
-
-    // this write should spill to the second segment
-    seq = 3
-    
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid, epoch, seq,
-      new SimpleRecord("d".getBytes),
-      new SimpleRecord("e".getBytes),
-      new SimpleRecord("f".getBytes)), 0)
-    assertEquals(Optional.of(firstAppendInfo.firstOffset), 
log.firstUnstableOffset)
-    assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
-
-    // now abort the transaction
-    val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, 
epoch, ControlRecordType.ABORT,
-      mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-    log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
-    assertEquals(Optional.empty, log.firstUnstableOffset)
-
-    // now check that a fetch includes the aborted transaction
-    val fetchDataInfo = log.read(0L, 2048, FetchIsolation.TXN_COMMITTED, true)
-
-    assertTrue(fetchDataInfo.abortedTransactions.isPresent)
-    assertEquals(1, fetchDataInfo.abortedTransactions.get.size)
-    assertEquals(new 
FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0), 
fetchDataInfo.abortedTransactions.get.get(0))
-  }
-
-  @Test
-  def testLoadPartitionDirWithNoSegmentsShouldNotThrow(): Unit = {
-    val dirName = UnifiedLog.logDeleteDirName(new TopicPartition("foo", 3))
-    val logDir = new File(tmpDir, dirName)
-    logDir.mkdirs()
-    val logConfig = LogTestUtils.createLogConfig()
-    val log = createLog(logDir, logConfig)
-    assertEquals(1, log.numberOfSegments)
-  }
-
-  @Test
-  def testSegmentDeletionWithHighWatermarkInitialization(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(
-      segmentBytes = 512,
-      segmentIndexBytes = 1000,
-      retentionMs = 999
-    )
-    val log = createLog(logDir, logConfig)
-
-    val expiredTimestamp = mockTime.milliseconds() - 1000
-    for (i <- 0 until 100) {
-      val records = TestUtils.singletonRecords(value = s"test$i".getBytes, 
timestamp = expiredTimestamp)
-      log.appendAsLeader(records, 0)
-    }
-
-    val initialHighWatermark = log.updateHighWatermark(25L)
-    assertEquals(25L, initialHighWatermark)
-
-    val initialNumSegments = log.numberOfSegments
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertTrue(log.numberOfSegments < initialNumSegments)
-    assertTrue(log.logStartOffset <= initialHighWatermark)
-  }
-
-  @Test
-  def testCannotDeleteSegmentsAtOrAboveHighWatermark(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(
-      segmentBytes = 512,
-      segmentIndexBytes = 1000,
-      retentionMs = 999
-    )
-    val log = createLog(logDir, logConfig)
-
-    val expiredTimestamp = mockTime.milliseconds() - 1000
-    for (i <- 0 until 100) {
-      val records = TestUtils.singletonRecords(value = s"test$i".getBytes, 
timestamp = expiredTimestamp)
-      log.appendAsLeader(records, 0)
-    }
-
-    // ensure we have at least a few segments so the test case is not trivial
-    assertTrue(log.numberOfSegments > 5)
-    assertEquals(0L, log.highWatermark)
-    assertEquals(0L, log.logStartOffset)
-    assertEquals(100L, log.logEndOffset)
-
-    for (hw <- 0 to 100) {
-      log.updateHighWatermark(hw)
-      assertEquals(hw, log.highWatermark)
-      log.deleteOldSegments()
-      assertTrue(log.logStartOffset <= hw)
-
-      // verify that all segments up to the high watermark have been deleted
-      log.logSegments.asScala.headOption.foreach { segment =>
-        assertTrue(segment.baseOffset <= hw)
-        assertTrue(segment.baseOffset >= log.logStartOffset)
-      }
-      log.logSegments.asScala.tail.foreach { segment =>
-        assertTrue(segment.baseOffset > hw)
-        assertTrue(segment.baseOffset >= log.logStartOffset)
-      }
-    }
-
-    assertEquals(100L, log.logStartOffset)
-    assertEquals(1, log.numberOfSegments)
-    assertEquals(0, log.activeSegment.size)
-  }
-
-  @Test
-  def testCannotIncrementLogStartOffsetPastHighWatermark(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(
-      segmentBytes = 512,
-      segmentIndexBytes = 1000,
-      retentionMs = 999
-    )
-    val log = createLog(logDir, logConfig)
-
-    for (i <- 0 until 100) {
-      val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
-      log.appendAsLeader(records, 0)
-    }
-
-    log.updateHighWatermark(25L)
-    assertThrows(classOf[OffsetOutOfRangeException], () => 
log.maybeIncrementLogStartOffset(26L, 
LogStartOffsetIncrementReason.ClientRecordDeletion))
-  }
-
-  @Test
-  def testBackgroundDeletionWithIOException(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-    assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
-
-    // Delete the underlying directory to trigger a KafkaStorageException
-    val dir = log.dir
-    Utils.delete(dir)
-    Files.createFile(dir.toPath)
-
-    assertThrows(classOf[KafkaStorageException], () => {
-      log.delete()
-    })
-    assertTrue(log.logDirFailureChannel.hasOfflineLogDir(tmpDir.toString))
-  }
-
-  /**
-   * test renaming a log's dir without reinitialization, which is the case 
during topic deletion
-   */
-  @Test
-  def testRenamingDirWithoutReinitialization(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-    assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
-
-    val newDir = TestUtils.randomPartitionLogDir(tmpDir)
-    assertTrue(newDir.exists())
-
-    log.renameDir(newDir.getName, false)
-    assertFalse(log.leaderEpochCache.nonEmpty)
-    assertTrue(log.partitionMetadataFile.isEmpty)
-    assertEquals(0, log.logEndOffset)
-
-    // verify that the background deletion can succeed
-    log.delete()
-    assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
-    assertFalse(newDir.exists())
-  }
-
   @Test
   def testMaybeUpdateHighWatermarkAsFollower(): Unit = {
     val logConfig = LogTestUtils.createLogConfig()
@@ -1650,47 +972,6 @@ class UnifiedLogTest {
     assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()), 
result)
   }
 
-  private def appendTransactionalToBuffer(buffer: ByteBuffer,
-                                          producerId: Long,
-                                          producerEpoch: Short,
-                                          leaderEpoch: Int = 0): (Long, Int) 
=> Unit = {
-    var sequence = 0
-    (offset: Long, numRecords: Int) => {
-      val builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.CREATE_TIME,
-        offset, mockTime.milliseconds(), producerId, producerEpoch, sequence, 
true, leaderEpoch)
-      for (seq <- sequence until sequence + numRecords) {
-        val record = new SimpleRecord(s"$seq".getBytes)
-        builder.append(record)
-      }
-
-      sequence += numRecords
-      builder.close()
-    }
-  }
-
-  private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer,
-                                         producerId: Long,
-                                         producerEpoch: Short,
-                                         offset: Long,
-                                         controlType: ControlRecordType,
-                                         coordinatorEpoch: Int = 0,
-                                         leaderEpoch: Int = 0): Unit = {
-    val marker = new EndTransactionMarker(controlType, coordinatorEpoch)
-    MemoryRecords.writeEndTransactionalMarker(buffer, offset, 
mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker)
-  }
-
-  private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, 
numRecords: Int): Unit = {
-    val builder = MemoryRecords.builder(buffer, Compression.NONE, 
TimestampType.CREATE_TIME, offset)
-    (0 until numRecords).foreach { seq =>
-      builder.append(new SimpleRecord(s"$seq".getBytes))
-    }
-    builder.close()
-  }
-
-  private def appendAsFollower(log: UnifiedLog, records: MemoryRecords, 
leaderEpoch: Int): Unit = {
-    records.batches.forEach(_.setPartitionLeaderEpoch(leaderEpoch))
-    log.appendAsFollower(records, leaderEpoch)
-  }
 
   private def createLog(dir: File,
                         config: LogConfig,
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index e9efb9e5af7..67f8cb61e50 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.record.internal.ControlRecordType;
 import org.apache.kafka.common.record.internal.DefaultRecordBatch;
@@ -204,6 +205,14 @@ public class LogTestUtils {
         return records(records, magicValue, Compression.NONE, 
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
     }
 
+    public static List<AbortedTxn> allAbortedTransactions(UnifiedLog log) {
+        List<AbortedTxn> result = new ArrayList<>();
+        for (LogSegment segment : log.logSegments()) {
+            result.addAll(segment.txnIndex().allAbortedTxns());
+        }
+        return result;
+    }
+
     public static void deleteProducerSnapshotFiles(File logDir) {
         Stream.of(logDir.listFiles())
                 .filter(f -> f.isFile() && 
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX))
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index f382fe803a3..bcefc6679aa 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -30,13 +30,16 @@ import 
org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
 import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.message.DescribeProducersResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.record.internal.CompressionType;
 import org.apache.kafka.common.record.internal.ControlRecordType;
 import org.apache.kafka.common.record.internal.DefaultRecordBatch;
+import org.apache.kafka.common.record.internal.EndTransactionMarker;
 import org.apache.kafka.common.record.internal.FileRecords;
 import org.apache.kafka.common.record.internal.InvalidMemoryRecordsProvider;
 import org.apache.kafka.common.record.internal.MemoryRecords;
@@ -51,6 +54,7 @@ import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.common.RequestLocal;
 import org.apache.kafka.server.common.TransactionVersion;
 import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
 import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
@@ -4343,4 +4347,667 @@ public class UnifiedLogTest {
                 .getValue();
         return gauge.value();
     }
+
+    @Test
+    public void testTransactionIndexUpdatedThroughReplication() throws 
IOException {
+        short epoch = 0;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+        long pid1 = 1L;
+        long pid2 = 2L;
+        long pid3 = 3L;
+        long pid4 = 4L;
+
+        BiConsumer<Long, Integer> appendPid1 = 
appendTransactionalToBuffer(buffer, pid1, epoch);
+        BiConsumer<Long, Integer> appendPid2 = 
appendTransactionalToBuffer(buffer, pid2, epoch);
+        BiConsumer<Long, Integer> appendPid3 = 
appendTransactionalToBuffer(buffer, pid3, epoch);
+        BiConsumer<Long, Integer> appendPid4 = 
appendTransactionalToBuffer(buffer, pid4, epoch);
+
+        appendPid1.accept(0L, 5);
+        appendNonTransactionalToBuffer(buffer, 5L, 3);
+        appendPid2.accept(8L, 2);
+        appendPid1.accept(10L, 4);
+        appendPid3.accept(14L, 3);
+        appendNonTransactionalToBuffer(buffer, 17L, 2);
+        appendPid1.accept(19L, 10);
+        appendEndTxnMarkerToBuffer(buffer, pid1, epoch, 29L, 
ControlRecordType.ABORT);
+        appendPid2.accept(30L, 6);
+        appendPid4.accept(36L, 3);
+        appendNonTransactionalToBuffer(buffer, 39L, 10);
+        appendPid3.accept(49L, 9);
+        appendEndTxnMarkerToBuffer(buffer, pid3, epoch, 58L, 
ControlRecordType.COMMIT);
+        appendPid4.accept(59L, 8);
+        appendPid2.accept(67L, 7);
+        appendEndTxnMarkerToBuffer(buffer, pid2, epoch, 74L, 
ControlRecordType.ABORT);
+        appendNonTransactionalToBuffer(buffer, 75L, 10);
+        appendPid4.accept(85L, 4);
+        appendEndTxnMarkerToBuffer(buffer, pid4, epoch, 89L, 
ControlRecordType.COMMIT);
+
+        buffer.flip();
+
+        appendAsFollower(log, MemoryRecords.readableRecords(buffer), epoch);
+
+        List<AbortedTxn> abortedTransactions = 
LogTestUtils.allAbortedTransactions(log);
+        List<AbortedTxn> expectedTransactions = List.of(
+            new 
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
+            new 
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)
+        );
+
+        assertEquals(expectedTransactions, abortedTransactions);
+
+        // Verify caching of the segment position of the first unstable offset
+        log.updateHighWatermark(30L);
+        assertCachedFirstUnstableOffset(log, 8L);
+
+        log.updateHighWatermark(75L);
+        assertCachedFirstUnstableOffset(log, 36L);
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(Optional.empty(), log.firstUnstableOffset());
+    }
+
+    @Test
+    public void testZombieCoordinatorFenced() throws IOException {
+        long pid = 1L;
+        short epoch = 0;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+
+        Consumer<Integer> append = 
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+
+        append.accept(10);
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+            1, 0, TransactionVersion.TV_0.featureLevel());
+
+        append.accept(5);
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.COMMIT, mockTime.milliseconds(),
+            2, 0, TransactionVersion.TV_0.featureLevel());
+
+        assertThrows(TransactionCoordinatorFencedException.class,
+            () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+                1, 0, TransactionVersion.TV_0.featureLevel()));
+    }
+
+    @Test
+    public void testZombieCoordinatorFencedEmptyTransaction() throws 
IOException {
+        long pid = 1L;
+        short epoch = 0;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        BiConsumer<Long, Integer> append = appendTransactionalToBuffer(buffer, 
pid, epoch, 1);
+        append.accept(0L, 10);
+        appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L, 
ControlRecordType.COMMIT, 1);
+
+        buffer.flip();
+        log.appendAsFollower(MemoryRecords.readableRecords(buffer), epoch);
+
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+                2, 1, TransactionVersion.TV_0.featureLevel());
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+            2, 1, TransactionVersion.TV_0.featureLevel());
+        assertThrows(TransactionCoordinatorFencedException.class,
+            () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+                1, 1, TransactionVersion.TV_0.featureLevel()));
+    }
+
+    @ParameterizedTest(name = "testEndTxnWithFencedProducerEpoch with 
transactionVersion={0}")
+    @ValueSource(shorts = {0, 1, 2})
+    public void testEndTxnWithFencedProducerEpoch(short transactionVersion) 
throws IOException {
+        long producerId = 1L;
+        short epoch = 5;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+
+        // First, write some transactional records to establish the current 
epoch
+        MemoryRecords records = MemoryRecords.withTransactionalRecords(
+            Compression.NONE, producerId, epoch, 0,
+            new SimpleRecord("key".getBytes(), "value".getBytes())
+        );
+        log.appendAsLeader(records, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion);
+
+        // Test 1: Old epoch (epoch - 1) should be rejected for both TV0/TV1 
and TV2
+        // TV0/TV1: markerEpoch < currentEpoch is rejected
+        // TV2: markerEpoch <= currentEpoch is rejected (requires strict >)
+        assertThrows(InvalidProducerEpochException.class,
+            () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, 
(short) (epoch - 1),
+                ControlRecordType.ABORT, mockTime.milliseconds(), 1, 0, 
transactionVersion));
+
+        // Test 2: Same epoch behavior differs between TV0/TV1 and TV2
+        // TV0/TV1: same epoch is allowed (markerEpoch >= currentEpoch)
+        // TV2: same epoch is rejected (requires strict >, markerEpoch > 
currentEpoch)
+        if (transactionVersion >= 2) {
+            // TV2: same epoch should be rejected
+            assertThrows(InvalidProducerEpochException.class,
+                () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, 
epoch,
+                    ControlRecordType.ABORT, mockTime.milliseconds(), 1, 0, 
transactionVersion));
+        } else {
+            // TV0/TV1: same epoch should be allowed
+            assertDoesNotThrow(() -> 
LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch,
+                ControlRecordType.ABORT, mockTime.milliseconds(), 1, 0, 
transactionVersion));
+        }
+    }
+
+    @Test
+    public void testTV2MarkerWithBumpedEpochSucceeds() throws IOException {
+        // Test that TV2 markers with bumped epochs (epoch + 1) are accepted 
(positive case)
+        // TV2 (KIP-890): Coordinator bumps epoch before writing marker, so 
markerEpoch = currentEpoch + 1
+        short transactionVersion = 2;
+        long producerId = 1L;
+        short epoch = 5;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+
+        // First, write some transactional records to establish the current 
epoch
+        MemoryRecords records = MemoryRecords.withTransactionalRecords(
+            Compression.NONE, producerId, epoch, 0,
+            new SimpleRecord("key".getBytes(), "value".getBytes())
+        );
+        log.appendAsLeader(records, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion);
+
+        // TV2: Verify that bumped epoch (epoch + 1) is accepted
+        short bumpedEpoch = (short) (epoch + 1);
+        assertDoesNotThrow(() -> LogTestUtils.appendEndTxnMarkerAsLeader(log, 
producerId, bumpedEpoch,
+            ControlRecordType.COMMIT, mockTime.milliseconds(), 1,
+            0, TransactionVersion.TV_2.featureLevel()));
+
+        // Verify the marker was successfully appended by checking producer 
state
+        ProducerStateEntry producerState = 
log.producerStateManager().activeProducers().get(producerId);
+        assertNotNull(producerState);
+        // After a commit marker, the producer epoch should be updated to the 
bumped epoch for TV2
+        assertEquals(bumpedEpoch, producerState.producerEpoch());
+    }
+
+    @Test
+    public void testReplicationWithTVUnknownAllowed() throws IOException {
+        // Test that TV_UNKNOWN is allowed for replication (REPLICATION 
origin) and uses TV_0 validation
+        // This simulates the scenario where:
+        // 1. Leader receives WriteTxnMarkersRequest with transactionVersion=2 
and validates with strict TV2 rules
+        // 2. Leader writes MemoryRecords to log (transactionVersion is not 
stored in MemoryRecords)
+        // 3. Follower receives MemoryRecords via replication (without 
transactionVersion metadata)
+        // 4. Follower uses TV_UNKNOWN which defaults to TV_0 validation (more 
permissive, safe because leader already validated)
+        long producerId = 1L;
+        short epoch = 5;
+        int coordinatorEpoch = 1;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+
+        // Step 1: Write transactional records as leader to establish current 
epoch
+        MemoryRecords transactionalRecords = 
MemoryRecords.withTransactionalRecords(
+            Compression.NONE, producerId, epoch, 0,
+            new SimpleRecord("key".getBytes(), "value".getBytes())
+        );
+        log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_2.featureLevel());
+
+        // Step 2: Simulate leader writing TV2 marker with bumped epoch (epoch 
+ 1)
+        // This is what happens at the leader when WriteTxnMarkersRequest is 
received
+        short bumpedEpoch = (short) (epoch + 1);
+        MemoryRecords leaderMarker = MemoryRecords.withEndTransactionMarker(
+            mockTime.milliseconds(),
+            producerId,
+            bumpedEpoch,
+            new EndTransactionMarker(ControlRecordType.COMMIT, 
coordinatorEpoch)
+        );
+        // Leader validates with TV2 (strict: markerEpoch > currentEpoch)
+        log.appendAsLeader(leaderMarker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_2.featureLevel());
+
+        // Verify leader state
+        ProducerStateEntry leaderProducerState = 
log.producerStateManager().activeProducers().get(producerId);
+        assertNotNull(leaderProducerState);
+        assertEquals(bumpedEpoch, leaderProducerState.producerEpoch());
+
+        // Step 3: Create a new log to simulate a follower
+        File followerLogDir = TestUtils.randomPartitionLogDir(tmpDir);
+        UnifiedLog followerLog = createLog(followerLogDir, logConfig);
+
+        // Step 4: Follower replicates transactional records first
+        MemoryRecords followerTransactionalRecords = 
MemoryRecords.withTransactionalRecords(
+            0L, Compression.NONE, producerId, epoch, 0, 0,
+            new SimpleRecord("key".getBytes(), "value".getBytes())
+        );
+        followerLog.appendAsFollower(followerTransactionalRecords, 0);
+
+        // Step 5: Follower replicates the marker (appendAsFollower uses 
TV_UNKNOWN internally)
+        // This should succeed because TV_UNKNOWN is allowed for REPLICATION 
origin
+        // and defaults to TV_0 validation (markerEpoch >= currentEpoch), 
which is more permissive
+        // The marker should be at offset 1 (after the transactional record at 
offset 0)
+        MemoryRecords followerMarker = MemoryRecords.withEndTransactionMarker(
+            1L, // offset after the transactional record
+            mockTime.milliseconds(),
+            0, // partition leader epoch
+            producerId,
+            bumpedEpoch,
+            new EndTransactionMarker(ControlRecordType.COMMIT, 
coordinatorEpoch)
+        );
+
+        // This should not throw an exception - TV_UNKNOWN is allowed for 
replication
+        assertDoesNotThrow(() -> followerLog.appendAsFollower(followerMarker, 
0));
+
+        // Verify follower state matches leader state
+        ProducerStateEntry followerProducerState = 
followerLog.producerStateManager().activeProducers().get(producerId);
+        assertNotNull(followerProducerState);
+        assertEquals(bumpedEpoch, followerProducerState.producerEpoch());
+        assertEquals(coordinatorEpoch, 
followerProducerState.coordinatorEpoch());
+
+        // Verify the marker was written to the follower log
+        assertEquals(2L, followerLog.logEndOffset()); // 1 transactional 
record + 1 marker
+    }
+
+    @Test
+    public void testLeaderRejectsTVUnknownForTransactionMarker() throws 
IOException {
+        // Test that TV_UNKNOWN is rejected for COORDINATOR origin (leader 
writing transaction markers)
+        // TV_UNKNOWN is only allowed for REPLICATION origin (followers)
+        long producerId = 1L;
+        short epoch = 5;
+        int coordinatorEpoch = 1;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+
+        // Write transactional records as leader to establish current epoch
+        MemoryRecords transactionalRecords = 
MemoryRecords.withTransactionalRecords(
+            Compression.NONE, producerId, epoch, 0,
+            new SimpleRecord("key".getBytes(), "value".getBytes())
+        );
+        log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_2.featureLevel());
+
+        // Attempt to write a transaction marker with TV_UNKNOWN as 
COORDINATOR (leader)
+        // This should throw IllegalArgumentException because TV_UNKNOWN is 
not allowed for COORDINATOR origin
+        MemoryRecords marker = MemoryRecords.withEndTransactionMarker(
+            mockTime.milliseconds(),
+            producerId,
+            (short) (epoch + 1), // bumped epoch for TV2
+            new EndTransactionMarker(ControlRecordType.COMMIT, 
coordinatorEpoch)
+        );
+
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () ->
+            log.appendAsLeader(marker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_UNKNOWN));
+
+        assertTrue(exception.getMessage().contains("transactionVersion must be 
explicitly specified"));
+        assertTrue(exception.getMessage().contains("TV_UNKNOWN"));
+        assertTrue(exception.getMessage().contains("COORDINATOR"));
+    }
+
+    @Test
+    public void testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment() 
throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+        short epoch = 0;
+        long pid = 1L;
+        Consumer<Integer> appendPid = 
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+
+        appendPid.accept(5);
+        LogTestUtils.appendNonTransactionalAsLeader(log, 3);
+        assertEquals(8L, log.logEndOffset());
+
+        log.roll();
+        assertEquals(2, log.logSegments().size());
+        appendPid.accept(5);
+
+        assertEquals(Optional.of(0L), log.firstUnstableOffset());
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.maybeIncrementLogStartOffset(5L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+        // the first unstable offset should be lower bounded by the log start 
offset
+        assertEquals(Optional.of(5L), log.firstUnstableOffset());
+    }
+
+    @Test
+    public void 
testLastStableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+        short epoch = 0;
+        long pid = 1L;
+        Consumer<Integer> appendPid = 
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+
+        appendPid.accept(5);
+        LogTestUtils.appendNonTransactionalAsLeader(log, 3);
+        assertEquals(8L, log.logEndOffset());
+
+        log.roll();
+        assertEquals(2, log.logSegments().size());
+        appendPid.accept(5);
+
+        assertEquals(Optional.of(0L), log.firstUnstableOffset());
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.maybeIncrementLogStartOffset(8L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(1, log.logSegments().size());
+
+        // the first unstable offset should be lower bounded by the log start 
offset
+        assertEquals(Optional.of(8L), log.firstUnstableOffset());
+    }
+
+    @Test
+    public void testAppendToTransactionIndexFailure() throws IOException {
+        long pid = 1L;
+        short epoch = 0;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+
+        Consumer<Integer> append = 
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+        append.accept(10);
+
+        // Kind of a hack, but renaming the index to a directory ensures that 
the append
+        // to the index will fail.
+        log.activeSegment().txnIndex().renameTo(log.dir());
+
+        // The append will be written to the log successfully, but the write 
to the index will fail
+        assertThrows(KafkaStorageException.class,
+            () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+                1, 0, TransactionVersion.TV_0.featureLevel()));
+        assertEquals(11L, log.logEndOffset());
+        assertEquals(0L, log.lastStableOffset());
+
+        // Try the append a second time. The appended offset in the log should 
not increase
+        // because the log dir is marked as failed.  Nor will there be a write 
to the transaction
+        // index.
+        assertThrows(KafkaStorageException.class,
+            () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT, mockTime.milliseconds(),
+                1, 0, TransactionVersion.TV_0.featureLevel()));
+        assertEquals(11L, log.logEndOffset());
+        assertEquals(0L, log.lastStableOffset());
+
+        // Even if the high watermark is updated, the first unstable offset 
does not move
+        log.updateHighWatermark(12L);
+        assertEquals(0L, log.lastStableOffset());
+
+        assertThrows(KafkaStorageException.class, () -> log.close());
+        UnifiedLog reopenedLog = createLog(logDir, logConfig, 0L, 0L, 
brokerTopicStats, mockTime.scheduler, mockTime,
+            producerStateManagerConfig, false, Optional.empty(), false);
+        assertEquals(11L, reopenedLog.logEndOffset());
+        assertEquals(1, 
reopenedLog.activeSegment().txnIndex().allAbortedTxns().size());
+        reopenedLog.updateHighWatermark(12L);
+        assertEquals(Optional.empty(), reopenedLog.firstUnstableOffset());
+    }
+
+    @Test
+    public void testOffsetSnapshot() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+
+        // append a few records
+        appendAsFollower(
+            log,
+            MemoryRecords.withRecords(
+                Compression.NONE,
+                new SimpleRecord("a".getBytes()),
+                new SimpleRecord("b".getBytes()),
+                new SimpleRecord("c".getBytes())
+            ),
+            5
+        );
+
+        log.updateHighWatermark(3L);
+        LogOffsetSnapshot offsets = log.fetchOffsetSnapshot();
+        assertEquals(3L, offsets.highWatermark().messageOffset);
+        assertFalse(offsets.highWatermark().messageOffsetOnly());
+
+        offsets = log.fetchOffsetSnapshot();
+        assertEquals(3L, offsets.highWatermark().messageOffset);
+        assertFalse(offsets.highWatermark().messageOffsetOnly());
+    }
+
+    @Test
+    public void testLastStableOffsetWithMixedProducerData() throws IOException 
{
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        log = createLog(logDir, logConfig);
+
+        // for convenience, both producers share the same epoch
+        short epoch = 5;
+
+        long pid1 = 137L;
+        int seq1 = 0;
+        long pid2 = 983L;
+        int seq2 = 0;
+
+        // add some transactional records
+        LogAppendInfo firstAppendInfo = 
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid1, epoch, seq1,
+            new SimpleRecord("a".getBytes()),
+            new SimpleRecord("b".getBytes()),
+            new SimpleRecord("c".getBytes())), 0);
+        assertEquals(Optional.of(firstAppendInfo.firstOffset()), 
log.firstUnstableOffset());
+
+        // mix in some non-transactional data
+        log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+            new SimpleRecord("g".getBytes()),
+            new SimpleRecord("h".getBytes()),
+            new SimpleRecord("i".getBytes())), 0);
+
+        // append data from a second transactional producer
+        LogAppendInfo secondAppendInfo = 
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid2, epoch, seq2,
+            new SimpleRecord("d".getBytes()),
+            new SimpleRecord("e".getBytes()),
+            new SimpleRecord("f".getBytes())), 0);
+
+        // LSO should not have changed
+        assertEquals(Optional.of(firstAppendInfo.firstOffset()), 
log.firstUnstableOffset());
+
+        // now first producer's transaction is aborted
+        LogAppendInfo abortAppendInfo = 
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, epoch, 
ControlRecordType.ABORT,
+            mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel());
+        log.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+
+        // LSO should now point to one less than the first offset of the 
second transaction
+        assertEquals(Optional.of(secondAppendInfo.firstOffset()), 
log.firstUnstableOffset());
+
+        // commit the second transaction
+        LogAppendInfo commitAppendInfo = 
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, epoch, 
ControlRecordType.COMMIT,
+            mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel());
+        log.updateHighWatermark(commitAppendInfo.lastOffset() + 1);
+
+        // now there should be no first unstable offset
+        assertEquals(Optional.empty(), log.firstUnstableOffset());
+    }
+
+    @Test
+    public void testAbortedTransactionSpanningMultipleSegments() throws 
IOException {
+        long pid = 137L;
+        short epoch = 5;
+        int seq = 0;
+
+        MemoryRecords records = 
MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq,
+            new SimpleRecord("a".getBytes()),
+            new SimpleRecord("b".getBytes()),
+            new SimpleRecord("c".getBytes()));
+
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(records.sizeInBytes()).build();
+        log = createLog(logDir, logConfig);
+
+        LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+        assertEquals(Optional.of(firstAppendInfo.firstOffset()), 
log.firstUnstableOffset());
+
+        // this write should spill to the second segment
+        
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid, epoch, 3,
+            new SimpleRecord("d".getBytes()),
+            new SimpleRecord("e".getBytes()),
+            new SimpleRecord("f".getBytes())), 0);
+        assertEquals(Optional.of(firstAppendInfo.firstOffset()), 
log.firstUnstableOffset());
+        assertEquals(3L, log.logEndOffsetMetadata().segmentBaseOffset);
+
+        // now abort the transaction
+        LogAppendInfo abortAppendInfo = 
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT,
+            mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel());
+        log.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+        assertEquals(Optional.empty(), log.firstUnstableOffset());
+
+        // now check that a fetch includes the aborted transaction
+        FetchDataInfo fetchDataInfo = log.read(0L, 2048, 
FetchIsolation.TXN_COMMITTED, true);
+
+        assertTrue(fetchDataInfo.abortedTransactions.isPresent());
+        assertEquals(1, fetchDataInfo.abortedTransactions.get().size());
+        assertEquals(new 
FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0), 
fetchDataInfo.abortedTransactions.get().get(0));
+    }
+
+    @Test
+    public void testLoadPartitionDirWithNoSegmentsShouldNotThrow() throws 
IOException {
+        String dirName = UnifiedLog.logDeleteDirName(new TopicPartition("foo", 
3));
+        File testLogDir = new File(tmpDir, dirName);
+        testLogDir.mkdirs();
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+        UnifiedLog testLog = createLog(testLogDir, logConfig);
+        assertEquals(1, testLog.numberOfSegments());
+    }
+
+    @Test
+    public void testSegmentDeletionWithHighWatermarkInitialization() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(512).segmentIndexBytes(1000).retentionMs(999).build();
+        log = createLog(logDir, logConfig);
+
+        long expiredTimestamp = mockTime.milliseconds() - 1000;
+        for (int i = 0; i < 100; i++) {
+            MemoryRecords records = LogTestUtils.singletonRecords(("test" + 
i).getBytes(), Compression.NONE, null, expiredTimestamp);
+            log.appendAsLeader(records, 0);
+        }
+
+        long initialHighWatermark = log.updateHighWatermark(25L);
+        assertEquals(25L, initialHighWatermark);
+
+        int initialNumSegments = log.numberOfSegments();
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertTrue(log.numberOfSegments() < initialNumSegments);
+        assertTrue(log.logStartOffset() <= initialHighWatermark);
+    }
+
+    @Test
+    public void testCannotDeleteSegmentsAtOrAboveHighWatermark() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(512).segmentIndexBytes(1000).retentionMs(999).build();
+        log = createLog(logDir, logConfig);
+
+        long expiredTimestamp = mockTime.milliseconds() - 1000;
+        for (int i = 0; i < 100; i++) {
+            MemoryRecords records = LogTestUtils.singletonRecords(("test" + 
i).getBytes(), Compression.NONE, null, expiredTimestamp);
+            log.appendAsLeader(records, 0);
+        }
+
+        // ensure we have at least a few segments so the test case is not 
trivial
+        assertTrue(log.numberOfSegments() > 5);
+        assertEquals(0L, log.highWatermark());
+        assertEquals(0L, log.logStartOffset());
+        assertEquals(100L, log.logEndOffset());
+
+        for (int hw = 0; hw <= 100; hw++) {
+            log.updateHighWatermark(hw);
+            assertEquals(hw, log.highWatermark());
+            log.deleteOldSegments();
+            assertTrue(log.logStartOffset() <= hw);
+
+            // verify that all segments up to the high watermark have been 
deleted
+            List<LogSegment> segments = log.logSegments();
+            if (!segments.isEmpty()) {
+                assertTrue(segments.get(0).baseOffset() <= hw);
+                assertTrue(segments.get(0).baseOffset() >= 
log.logStartOffset());
+            }
+            for (int i = 1; i < segments.size(); i++) {
+                assertTrue(segments.get(i).baseOffset() > hw);
+                assertTrue(segments.get(i).baseOffset() >= 
log.logStartOffset());
+            }
+        }
+
+        assertEquals(100L, log.logStartOffset());
+        assertEquals(1, log.numberOfSegments());
+        assertEquals(0, log.activeSegment().size());
+    }
+
+    @Test
+    public void testCannotIncrementLogStartOffsetPastHighWatermark() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(512).segmentIndexBytes(1000).build();
+        log = createLog(logDir, logConfig);
+
+        for (int i = 0; i < 100; i++) {
+            MemoryRecords records = LogTestUtils.singletonRecords(("test" + 
i).getBytes(), null);
+            log.appendAsLeader(records, 0);
+        }
+
+        log.updateHighWatermark(25L);
+        assertThrows(OffsetOutOfRangeException.class,
+            () -> log.maybeIncrementLogStartOffset(26L, 
LogStartOffsetIncrementReason.ClientRecordDeletion));
+    }
+
+    @Test
+    public void testBackgroundDeletionWithIOException() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024).build();
+        log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "The number of segments should 
be 1");
+
+        // Delete the underlying directory to trigger a KafkaStorageException
+        File dir = log.dir();
+        Utils.delete(dir);
+        Files.createFile(dir.toPath());
+
+        assertThrows(KafkaStorageException.class, () -> log.delete());
+        
assertTrue(log.logDirFailureChannel().hasOfflineLogDir(tmpDir.toString()));
+    }
+
+    /**
+     * test renaming a log's dir without reinitialization, which is the case 
during topic deletion
+     */
+    @Test
+    public void testRenamingDirWithoutReinitialization() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024).build();
+        log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "The number of segments should 
be 1");
+
+        File newDir = TestUtils.randomPartitionLogDir(tmpDir);
+        assertTrue(newDir.exists());
+
+        log.renameDir(newDir.getName(), false);
+        assertFalse(log.leaderEpochCache().nonEmpty());
+        assertTrue(log.partitionMetadataFile().isEmpty());
+        assertEquals(0, log.logEndOffset());
+
+        // verify that the background deletion can succeed
+        log.delete();
+        assertEquals(0, log.numberOfSegments(), "The number of segments should 
be 0");
+        assertFalse(newDir.exists());
+    }
+
+    private BiConsumer<Long, Integer> appendTransactionalToBuffer(ByteBuffer 
buffer, long producerId, short producerEpoch) {
+        return appendTransactionalToBuffer(buffer, producerId, producerEpoch, 
0);
+    }
+
+    private BiConsumer<Long, Integer> appendTransactionalToBuffer(ByteBuffer 
buffer, long producerId, short producerEpoch, int leaderEpoch) {
+        int[] sequence = {0};
+        return (offset, numRecords) -> {
+            int baseSequence = sequence[0];
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.CREATE_TIME,
+                offset, mockTime.milliseconds(), producerId, producerEpoch, 
baseSequence, true, leaderEpoch);
+            for (int seq = baseSequence; seq < baseSequence + numRecords; 
seq++) {
+                builder.append(new 
SimpleRecord(String.valueOf(seq).getBytes()));
+            }
+            sequence[0] += numRecords;
+            builder.close();
+        };
+    }
+
+    private void appendEndTxnMarkerToBuffer(ByteBuffer buffer, long 
producerId, short producerEpoch,
+                                             long offset, ControlRecordType 
controlType) {
+        appendEndTxnMarkerToBuffer(buffer, producerId, producerEpoch, offset, 
controlType, 0, 0);
+    }
+
+    private void appendEndTxnMarkerToBuffer(ByteBuffer buffer, long 
producerId, short producerEpoch,
+                                             long offset, ControlRecordType 
controlType,
+                                             int coordinatorEpoch) {
+        appendEndTxnMarkerToBuffer(buffer, producerId, producerEpoch, offset, 
controlType, coordinatorEpoch, 0);
+    }
+
+    private void appendEndTxnMarkerToBuffer(ByteBuffer buffer, long 
producerId, short producerEpoch,
+                                             long offset, ControlRecordType 
controlType,
+                                             int coordinatorEpoch, int 
leaderEpoch) {
+        EndTransactionMarker marker = new EndTransactionMarker(controlType, 
coordinatorEpoch);
+        MemoryRecords.writeEndTransactionalMarker(buffer, offset, 
mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker);
+    }
+
+    private void appendNonTransactionalToBuffer(ByteBuffer buffer, long 
offset, int numRecords) {
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE, TimestampType.CREATE_TIME, offset);
+        for (int seq = 0; seq < numRecords; seq++) {
+            builder.append(new SimpleRecord(String.valueOf(seq).getBytes()));
+        }
+        builder.close();
+    }
 }

Reply via email to