This is an automated email from the ASF dual-hosted git repository.

payang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fb3d6fb546f KAFKA-19752 Move parts of UnifiedLogTest to storage module 
(#21763)
fb3d6fb546f is described below

commit fb3d6fb546ffd1eec4d21407e13056247c7d19b1
Author: Ken Huang <[email protected]>
AuthorDate: Tue Mar 17 22:15:58 2026 +0800

    KAFKA-19752 Move parts of UnifiedLogTest to storage module (#21763)
    
    testRetentionIdempotency ~ testLogRollAfterLogHandlerClosed
    
    Reviewers: PoAn Yang <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 675 +------------------
 .../kafka/storage/internals/log/LogTestUtils.java  |  43 ++
 .../storage/internals/log/UnifiedLogTest.java      | 734 ++++++++++++++++++++-
 4 files changed, 777 insertions(+), 677 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 3008a79ce34..8d00d2dca5b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -305,7 +305,7 @@
     <suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
               
files="(UnifiedLog|RemoteLogManager|RemoteLogManagerTest|UnifiedLogTest).java"/>
     <suppress checks="MethodLength" files="RemoteLogManagerConfig.java"/>
-    <suppress checks="JavaNCSS" files="RemoteLogManagerTest.java"/>
+    <suppress checks="JavaNCSS" 
files="(RemoteLogManagerTest|UnifiedLogTest).java"/>
 
     <!-- benchmarks -->
     <suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 63cb90ab3cd..b01375e59ae 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -41,8 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, 
UnexpectedAppendOffs
 import org.apache.kafka.server.util.{MockTime, Scheduler}
 import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
AsyncOffsetReader, Cleaner, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, 
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, 
UnifiedLog, VerificationGuard}
-import org.apache.kafka.storage.internals.utils.Throttler
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
AsyncOffsetReader, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException, 
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -58,7 +57,7 @@ import java.nio.ByteBuffer
 import java.nio.file.Files
 import java.util
 import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
-import java.util.{Optional, OptionalLong, Properties}
+import java.util.{Optional, Properties}
 import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 
@@ -92,676 +91,6 @@ class UnifiedLogTest {
     }
   }
 
-  @Test
-  def testRetentionIdempotency(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
-    val log = createLog(logDir, logConfig)
-
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes))), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "b".getBytes))), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes))), 0)
-
-    mockTime.sleep(901)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.maybeIncrementLogStartOffset(1L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    assertEquals(2, log.deleteOldSegments(),
-      "Expecting two segment deletions as log start offset retention should 
unblock time based retention")
-    assertEquals(0, log.deleteOldSegments())
-  }
-
-
-  @Test
-  def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
-    val log = createLog(logDir, logConfig)
-    val pid1 = 1L
-    val epoch = 0.toShort
-
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
-      producerEpoch = epoch, sequence = 0), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
-      producerEpoch = epoch, sequence = 1), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
-      producerEpoch = epoch, sequence = 2), 0)
-    log.updateHighWatermark(log.logEndOffset)
-    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
-
-    // Increment the log start offset to exclude the first two segments.
-    log.maybeIncrementLogStartOffset(log.logEndOffset - 1, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
-    mockTime.sleep(1)
-    assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
-      "expect a single producer state snapshot remaining")
-  }
-
-  @Test
-  def testCompactionDeletesProducerStateSnapshots(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
cleanupPolicy = TopicConfig.CLEANUP_POLICY_COMPACT, fileDeleteDelayMs = 0)
-    val log = createLog(logDir, logConfig)
-    val pid1 = 1L
-    val epoch = 0.toShort
-    val cleaner = new Cleaner(0,
-      new FakeOffsetMap(Int.MaxValue),
-      64 * 1024,
-      64 * 1024,
-      0.75,
-      new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", 
mockTime),
-      mockTime,
-      tp => {})
-
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"a".getBytes())), producerId = pid1,
-      producerEpoch = epoch, sequence = 0), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"b".getBytes())), producerId = pid1,
-      producerEpoch = epoch, sequence = 1), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"c".getBytes())), producerId = pid1,
-      producerEpoch = epoch, sequence = 2), 0)
-    log.updateHighWatermark(log.logEndOffset)
-    
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
-      "expected a snapshot file per segment base offset, except the first 
segment")
-    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
-
-    // Clean segments, this should delete everything except the active segment 
since there only
-    // exists the key "a".
-    cleaner.clean(new LogToClean(log, 0, log.logEndOffset, false))
-    // There is no other key so we don't delete anything
-    assertEquals(0, log.deleteOldSegments())
-    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
-    mockTime.sleep(1)
-    
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
-      "expected a snapshot file per segment base offset, excluding the first")
-  }
-
-  /**
-   * After loading the log, producer state is truncated such that there are no 
producer state snapshot files which
-   * exceed the log end offset. This test verifies that these are removed.
-   */
-  @Test
-  def testLoadingLogDeletesProducerStateSnapshotsPastLogEndOffset(): Unit = {
-    val straySnapshotFile = LogFileUtils.producerSnapshotFile(logDir, 
42).toPath
-    Files.createFile(straySnapshotFile)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
-    createLog(logDir, logConfig)
-    assertEquals(0, ProducerStateManager.listSnapshotFiles(logDir).size,
-      "expected producer state snapshots greater than the log end offset to be 
cleaned up")
-  }
-
-  @Test
-  def testProducerIdMapTruncateFullyAndStartAt(): Unit = {
-    val records = TestUtils.singletonRecords("foo".getBytes)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
-    val log = createLog(logDir, logConfig)
-    log.appendAsLeader(records, 0)
-    log.takeProducerSnapshot()
-
-    log.appendAsLeader(TestUtils.singletonRecords("bar".getBytes), 0)
-    log.appendAsLeader(TestUtils.singletonRecords("baz".getBytes), 0)
-    log.takeProducerSnapshot()
-
-    assertEquals(3, log.logSegments.size)
-    assertEquals(3, log.latestProducerStateEndOffset)
-    assertEquals(OptionalLong.of(3), log.latestProducerSnapshotOffset)
-
-    log.truncateFullyAndStartAt(29, Optional.empty)
-    assertEquals(1, log.logSegments.size)
-    assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset)
-    assertEquals(29, log.latestProducerStateEndOffset)
-  }
-
-  @Test
-  def testProducerIdExpirationOnSegmentDeletion(): Unit = {
-    val pid1 = 1L
-    val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), 
producerId = pid1, producerEpoch = 0, sequence = 0)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
-    val log = createLog(logDir, logConfig)
-    log.appendAsLeader(records, 0)
-    log.takeProducerSnapshot()
-
-    val pid2 = 2L
-    log.appendAsLeader(TestUtils.records(Seq(new 
SimpleRecord("bar".getBytes)), producerId = pid2, producerEpoch = 0, sequence = 
0),
-      0)
-    log.appendAsLeader(TestUtils.records(Seq(new 
SimpleRecord("baz".getBytes)), producerId = pid2, producerEpoch = 0, sequence = 
1),
-      0)
-    log.takeProducerSnapshot()
-
-    assertEquals(3, log.logSegments.size)
-    assertEquals(util.Set.of(pid1, pid2), 
log.activeProducersWithLastSequence.keySet)
-
-    log.updateHighWatermark(log.logEndOffset)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-
-    // Producer state should not be removed when deleting log segment
-    assertEquals(2, log.logSegments.size)
-    assertEquals(util.Set.of(pid1, pid2), 
log.activeProducersWithLastSequence.keySet)
-  }
-
-  @Test
-  def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint(): Unit 
= {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig)
-    log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), 0)
-    log.roll(Optional.of(1L))
-    assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset)
-    assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset)
-
-    log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), 0)
-    log.roll(Optional.of(2L))
-    assertEquals(OptionalLong.of(2L), log.latestProducerSnapshotOffset)
-    assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset)
-
-    log.appendAsLeader(TestUtils.singletonRecords("c".getBytes), 0)
-    log.roll(Optional.of(3L))
-    assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset)
-
-    // roll triggers a flush at the starting offset of the new segment, we 
should retain all snapshots
-    assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset)
-
-    // even if we flush within the active segment, the snapshot should remain
-    log.appendAsLeader(TestUtils.singletonRecords("baz".getBytes), 0)
-    log.flushUptoOffsetExclusive(4L)
-    assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset)
-  }
-
-  @Test
-  def testProducerSnapshotAfterSegmentRollOnAppend(): Unit = {
-    val producerId = 1L
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024)
-    val log = createLog(logDir, logConfig)
-
-    log.appendAsLeader(TestUtils.records(Seq(new 
SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))),
-      producerId = producerId, producerEpoch = 0, sequence = 0),
-      0)
-
-    // The next append should overflow the segment and cause it to roll
-    log.appendAsLeader(TestUtils.records(Seq(new 
SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))),
-      producerId = producerId, producerEpoch = 0, sequence = 1),
-      0)
-
-    assertEquals(2, log.logSegments.size)
-    assertEquals(1L, log.activeSegment.baseOffset)
-    assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset)
-
-    // Force a reload from the snapshot to check its consistency
-    log.truncateTo(1L)
-
-    assertEquals(2, log.logSegments.size)
-    assertEquals(1L, log.activeSegment.baseOffset)
-    assertTrue(log.activeSegment.log.batches.asScala.isEmpty)
-    assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset)
-
-    val lastEntry = log.producerStateManager.lastEntry(producerId)
-    assertTrue(lastEntry.isPresent)
-    assertEquals(0L, lastEntry.get.firstDataOffset)
-    assertEquals(0L, lastEntry.get.lastDataOffset)
-  }
-
-  @Test
-  def testRebuildTransactionalState(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-
-    val pid = 137L
-    val epoch = 5.toShort
-    val seq = 0
-
-    // add some transactional records
-    val records = MemoryRecords.withTransactionalRecords(Compression.NONE, 
pid, epoch, seq,
-      new SimpleRecord("foo".getBytes),
-      new SimpleRecord("bar".getBytes),
-      new SimpleRecord("baz".getBytes))
-    log.appendAsLeader(records, 0)
-    val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, 
epoch, ControlRecordType.ABORT,
-      mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-    log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
-
-    // now there should be no first unstable offset
-    assertEquals(Optional.empty, log.firstUnstableOffset)
-
-    log.close()
-
-    val reopenedLog = createLog(logDir, logConfig, lastShutdownClean = false)
-    reopenedLog.updateHighWatermark(abortAppendInfo.lastOffset + 1)
-    assertEquals(Optional.empty, reopenedLog.firstUnstableOffset)
-  }
-
-  @Test
-  def testPeriodicProducerIdExpiration(): Unit = {
-    val producerStateManagerConfig = new ProducerStateManagerConfig(200, false)
-    val producerIdExpirationCheckIntervalMs = 100
-
-    val pid = 23L
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig,
-      producerIdExpirationCheckIntervalMs = 
producerIdExpirationCheckIntervalMs)
-    val records = Seq(new SimpleRecord(mockTime.milliseconds(), 
"foo".getBytes))
-    log.appendAsLeader(TestUtils.records(records, producerId = pid, 
producerEpoch = 0, sequence = 0), 0)
-
-    assertEquals(util.Set.of(pid), log.activeProducersWithLastSequence.keySet)
-
-    mockTime.sleep(producerIdExpirationCheckIntervalMs)
-    assertEquals(util.Set.of(pid), log.activeProducersWithLastSequence.keySet)
-
-    mockTime.sleep(producerIdExpirationCheckIntervalMs)
-    assertEquals(util.Set.of(), log.activeProducersWithLastSequence.keySet)
-  }
-
-  @Test
-  def testDuplicateAppends(): Unit = {
-    // create a log
-    val log = createLog(logDir, new LogConfig(new Properties))
-    val pid = 1L
-    val epoch: Short = 0
-
-    var seq = 0
-    // Pad the beginning of the log.
-    for (_ <- 0 to 5) {
-      val record = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
-        producerId = pid, producerEpoch = epoch, sequence = seq)
-      log.appendAsLeader(record, 0)
-      seq = seq + 1
-    }
-    // Append an entry with multiple log records.
-    def createRecords = TestUtils.records(List(
-      new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes),
-      new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes),
-      new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes)
-    ), producerId = pid, producerEpoch = epoch, sequence = seq)
-    val multiEntryAppendInfo = log.appendAsLeader(createRecords, 0)
-    assertEquals(
-      multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1,
-      3,
-      "should have appended 3 entries"
-    )
-
-    // Append a Duplicate of the tail, when the entry at the tail has multiple 
records.
-    val dupMultiEntryAppendInfo = log.appendAsLeader(createRecords, 0)
-    assertEquals(
-      multiEntryAppendInfo.firstOffset,
-      dupMultiEntryAppendInfo.firstOffset,
-      "Somehow appended a duplicate entry with multiple log records to the 
tail"
-    )
-    assertEquals(multiEntryAppendInfo.lastOffset, 
dupMultiEntryAppendInfo.lastOffset,
-      "Somehow appended a duplicate entry with multiple log records to the 
tail")
-
-    seq = seq + 3
-
-    // Append a partial duplicate of the tail. This is not allowed.
-    var records = TestUtils.records(
-      List(
-        new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes),
-        new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, 
s"value-$seq".getBytes)),
-      producerId = pid, producerEpoch = epoch, sequence = seq - 2)
-    assertThrows(classOf[OutOfOrderSequenceException], () => 
log.appendAsLeader(records, 0),
-      () => "Should have received an OutOfOrderSequenceException since we 
attempted to append a duplicate of a records in the middle of the log.")
-
-    // Append a duplicate of the batch which is 4th from the tail. This should 
succeed without error since we
-    // retain the batch metadata of the last 5 batches.
-    val duplicateOfFourth = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
-      producerId = pid, producerEpoch = epoch, sequence = 2)
-    log.appendAsLeader(duplicateOfFourth, 0)
-
-    // Duplicates at older entries are reported as OutOfOrderSequence errors
-    records = TestUtils.records(
-      List(new SimpleRecord(mockTime.milliseconds, s"key-1".getBytes, 
s"value-1".getBytes)),
-      producerId = pid, producerEpoch = epoch, sequence = 1)
-    assertThrows(classOf[OutOfOrderSequenceException], () => 
log.appendAsLeader(records, 0),
-      () => "Should have received an OutOfOrderSequenceException since we 
attempted to append a duplicate of a batch which is older than the last 5 
appended batches.")
-
-    // Append a duplicate entry with a single records at the tail of the log. 
This should return the appendInfo of the original entry.
-    def createRecordsWithDuplicate = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
-      producerId = pid, producerEpoch = epoch, sequence = seq)
-    val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, 0)
-    val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, 0)
-    assertEquals(
-      origAppendInfo.firstOffset,
-      newAppendInfo.firstOffset,
-      "Inserted a duplicate records into the log"
-    )
-    assertEquals(origAppendInfo.lastOffset, newAppendInfo.lastOffset,
-      "Inserted a duplicate records into the log")
-  }
-
-  @Test
-  def testMultipleProducerIdsPerMemoryRecord(): Unit = {
-    // create a log
-    val log = createLog(logDir, new LogConfig(new Properties))
-
-    val producerEpoch: Short = 0
-    val partitionLeaderEpoch = 0
-    val buffer = ByteBuffer.allocate(512)
-
-    var builder = MemoryRecords.builder(
-      buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
-      TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), 1L, 
producerEpoch, 0, false,
-      partitionLeaderEpoch
-    )
-    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-    builder.close()
-
-    builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
Compression.NONE,
-      TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), 2L, 
producerEpoch, 0, false,
-      partitionLeaderEpoch)
-    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-    builder.close()
-
-    builder = MemoryRecords.builder(
-      buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
-      TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), 3L, 
producerEpoch, 0, false,
-      partitionLeaderEpoch
-    )
-    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-    builder.close()
-
-    builder = MemoryRecords.builder(
-      buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
-      TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), 4L, 
producerEpoch, 0, false,
-      partitionLeaderEpoch
-    )
-    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-    builder.close()
-
-    buffer.flip()
-    val memoryRecords = MemoryRecords.readableRecords(buffer)
-
-    log.appendAsFollower(memoryRecords, partitionLeaderEpoch)
-    log.flush(false)
-
-    val fetchedData = LogTestUtils.readLog(log, 0, Int.MaxValue)
-
-    val origIterator = memoryRecords.batches.iterator()
-    for (batch <- fetchedData.records.batches.asScala) {
-      assertTrue(origIterator.hasNext)
-      val origEntry = origIterator.next()
-      assertEquals(origEntry.producerId, batch.producerId)
-      assertEquals(origEntry.baseOffset, batch.baseOffset)
-      assertEquals(origEntry.baseSequence, batch.baseSequence)
-    }
-  }
-
-  @Test
-  def testDuplicateAppendToFollower(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    val producerEpoch: Short = 0
-    val pid = 1L
-    val baseSequence = 0
-    val partitionLeaderEpoch = 0
-    // The point of this test is to ensure that validation isn't performed on 
the follower.
-    // this is a bit contrived. to trigger the duplicate case for a follower 
append, we have to append
-    // a batch with matching sequence numbers, but valid increasing offsets
-    assertEquals(0L, log.logEndOffset)
-    log.appendAsFollower(
-      MemoryRecords.withIdempotentRecords(
-        0L,
-        Compression.NONE,
-        pid,
-        producerEpoch,
-        baseSequence,
-        partitionLeaderEpoch,
-        new SimpleRecord("a".getBytes),
-        new SimpleRecord("b".getBytes)
-      ),
-      partitionLeaderEpoch
-    )
-    log.appendAsFollower(
-      MemoryRecords.withIdempotentRecords(
-        2L,
-        Compression.NONE,
-        pid,
-        producerEpoch,
-        baseSequence,
-        partitionLeaderEpoch,
-        new SimpleRecord("a".getBytes),
-        new SimpleRecord("b".getBytes)
-      ),
-      partitionLeaderEpoch
-    )
-
-    // Ensure that even the duplicate sequences are accepted on the follower.
-    assertEquals(4L, log.logEndOffset)
-  }
-
-  @Test
-  def testMultipleProducersWithDuplicatesInSingleAppend(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-
-    val pid1 = 1L
-    val pid2 = 2L
-    val producerEpoch: Short = 0
-
-    val buffer = ByteBuffer.allocate(512)
-
-    // pid1 seq = 0
-    var builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
-      TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), pid1, 
producerEpoch, 0)
-    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-    builder.close()
-
-    // pid2 seq = 0
-    builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE,
-      TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), pid2, 
producerEpoch, 0)
-    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-    builder.close()
-
-    // pid1 seq = 1
-    builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE,
-      TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), pid1, 
producerEpoch, 1)
-    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-    builder.close()
-
-    // pid2 seq = 1
-    builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE,
-      TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), pid2, 
producerEpoch, 1)
-    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-    builder.close()
-
-    // // pid1 seq = 1 (duplicate)
-    builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE,
-      TimestampType.LOG_APPEND_TIME, 4L, mockTime.milliseconds(), pid1, 
producerEpoch, 1)
-    builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-    builder.close()
-
-    buffer.flip()
-
-    val epoch = 0
-    val records = MemoryRecords.readableRecords(buffer)
-    records.batches.forEach(_.setPartitionLeaderEpoch(epoch))
-
-    // Ensure that batches with duplicates are accepted on the follower.
-    assertEquals(0L, log.logEndOffset)
-    log.appendAsFollower(records, epoch)
-    assertEquals(5L, log.logEndOffset)
-  }
-
-  @Test
-  def testOldProducerEpoch(): Unit = {
-    // create a log
-    val log = createLog(logDir, new LogConfig(new Properties))
-    val pid = 1L
-    val newEpoch: Short = 1
-    val oldEpoch: Short = 0
-
-    val records = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), 
producerId = pid, producerEpoch = newEpoch, sequence = 0)
-    log.appendAsLeader(records, 0)
-
-    val nextRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), 
producerId = pid, producerEpoch = oldEpoch, sequence = 0)
-    assertThrows(classOf[InvalidProducerEpochException], () => 
log.appendAsLeader(nextRecords, 0))
-  }
-
-  @Test
-  def testDeleteSnapshotsOnIncrementLogStartOffset(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig)
-    val pid1 = 1L
-    val pid2 = 2L
-    val epoch = 0.toShort
-
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1,
-      producerEpoch = epoch, sequence = 0), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2,
-      producerEpoch = epoch, sequence = 0), 0)
-    log.roll()
-
-    assertEquals(2, log.activeProducersWithLastSequence.size)
-    assertEquals(2, ProducerStateManager.listSnapshotFiles(log.dir).size)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.maybeIncrementLogStartOffset(2L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")// force retention to kick in so that the snapshot files are cleaned 
up.
-    mockTime.sleep(logConfig.fileDeleteDelayMs + 1000) // advance the clock so 
file deletion takes place
-
-    // Deleting records should not remove producer state but should delete 
snapshots after the file deletion delay.
-    assertEquals(2, log.activeProducersWithLastSequence.size)
-    assertEquals(1, ProducerStateManager.listSnapshotFiles(log.dir).size)
-    val retainedLastSeq = log.activeProducersWithLastSequence.get(pid2)
-    assertEquals(0, retainedLastSeq)
-  }
-
-  /**
-   * Test for jitter s for time based log roll. This test appends messages 
then changes the time
-   * using the mock clock to force the log to roll and checks the number of 
segments.
-   */
-  @Test
-  def testTimeBasedLogRollJitter(): Unit = {
-    var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = 
mockTime.milliseconds)
-    val maxJitter = 20 * 60L
-    // create a log
-    val logConfig = LogTestUtils.createLogConfig(segmentMs = 1 * 60 * 60L, 
segmentJitterMs = maxJitter)
-    val log = createLog(logDir, logConfig)
-    assertEquals(1, log.numberOfSegments, "Log begins with a single empty 
segment.")
-    log.appendAsLeader(set, 0)
-
-    mockTime.sleep(log.config.segmentMs - maxJitter)
-    set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = 
mockTime.milliseconds)
-    log.appendAsLeader(set, 0)
-    assertEquals(1, log.numberOfSegments,
-      "Log does not roll on this append because it occurs earlier than max 
jitter")
-    mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
-    set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = 
mockTime.milliseconds)
-    log.appendAsLeader(set, 0)
-    assertEquals(2, log.numberOfSegments,
-      "Log should roll after segmentMs adjusted by random jitter")
-  }
-
-  /**
-   * Test that appending more than the maximum segment size rolls the log
-   */
-  @Test
-  def testSizeBasedLogRoll(): Unit = {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds)
-    val setSize = createRecords.sizeInBytes
-    val msgPerSeg = 10
-    val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 
messages
-    // create a log
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
-    val log = createLog(logDir, logConfig)
-    assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
-    // segments expire in size
-    for (_ <- 1 to (msgPerSeg + 1))
-      log.appendAsLeader(createRecords, 0)
-    assertEquals(2, log.numberOfSegments,
-      "There should be exactly 2 segments.")
-  }
-
-  /**
-   * Test that we can open and append to an empty log
-   */
-  @Test
-  def testLoadEmptyLog(): Unit = {
-    createEmptyLogs(logDir, 0)
-    val log = createLog(logDir, new LogConfig(new Properties))
-    log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds), 0)
-  }
-
-  /**
-   * This test case appends a bunch of messages and checks that we can read 
them all back using sequential offsets.
-   */
-  @Test
-  def testAppendAndReadWithSequentialOffsets(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 71)
-    val log = createLog(logDir, logConfig)
-    val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
-
-    for (value <- values)
-      log.appendAsLeader(TestUtils.singletonRecords(value = value), 0)
-
-    for (i <- values.indices) {
-      val read = LogTestUtils.readLog(log, i, 
1).records.batches.iterator.next()
-      assertEquals(i, read.lastOffset, "Offset read should match order 
appended.")
-      val actual = read.iterator.next()
-      assertNull(actual.key, "Key should be null")
-      assertEquals(ByteBuffer.wrap(values(i)), actual.value, "Values not 
equal")
-    }
-    assertEquals(0, LogTestUtils.readLog(log, values.length, 
100).records.batches.asScala.size,
-      "Reading beyond the last message returns nothing.")
-  }
-
-  /**
-   * This test appends a bunch of messages with non-sequential offsets and 
checks that we can an the correct message
-   * from any offset less than the logEndOffset including offsets not appended.
-   */
-  @Test
-  def testAppendAndReadWithNonSequentialOffsets(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 72)
-    val log = createLog(logDir, logConfig)
-    val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
-
-    // now test the case that we give the offsets and use non-sequential 
offsets
-    for (i <- records.indices) {
-      log.appendAsFollower(
-        MemoryRecords.withRecords(messageIds(i), Compression.NONE, 0, 
records(i)),
-        Int.MaxValue
-      )
-    }
-    for (i <- 50 until messageIds.max) {
-      val idx = messageIds.indexWhere(_ >= i)
-      val read = LogTestUtils.readLog(log, i, 
100).records.records.iterator.next()
-      assertEquals(messageIds(idx), read.offset, "Offset read should match 
message id.")
-      assertEquals(records(idx), new SimpleRecord(read), "Message should match 
appended.")
-    }
-  }
-
-  /**
-   * This test covers an odd case where we have a gap in the offsets that 
falls at the end of a log segment.
-   * Specifically we create a log where the last message in the first segment 
has offset 0. If we
-   * then read offset 1, we should expect this read to come from the second 
segment, even though the
-   * first segment has the greatest lower bound on the offset.
-   */
-  @Test
-  def testReadAtLogGap(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 300)
-    val log = createLog(logDir, logConfig)
-
-    // keep appending until we have two segments with only a single message in 
the second segment
-    while (log.numberOfSegments == 1)
-      log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), 0)
-
-    // now manually truncate off all but one message from the first segment to 
create a gap in the messages
-    log.logSegments.asScala.head.truncateTo(1)
-
-    assertEquals(log.logEndOffset - 1, LogTestUtils.readLog(log, 1, 
200).records.batches.iterator.next().lastOffset,
-      "A read should now return the last message in the log")
-  }
-
-  @Test
-  def testLogRollAfterLogHandlerClosed(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig()
-    val log = createLog(logDir,  logConfig)
-    log.closeHandlers()
-    assertThrows(classOf[KafkaStorageException], () => 
log.roll(Optional.of(1L)))
-  }
-
   private def createKafkaConfigWithRLM: KafkaConfig = {
     val props = new Properties()
     props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index ed30e25032e..92a1e8cf0d9 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -34,6 +34,7 @@ import org.apache.kafka.server.common.RequestLocal;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -327,4 +328,46 @@ public class LogTestUtils {
             return new LogConfig(configs);
         }
     }
+    
+    public static class FakeOffsetMap implements OffsetMap {
+
+        private final Map<String, Long> map = new HashMap<>();
+        private long latestOff = -1L;
+
+        @Override
+        public int slots() {
+            return Integer.MAX_VALUE;
+        }
+        
+        @Override
+        public void put(ByteBuffer key, long offset) {
+            latestOff = offset;
+            map.put(new String(Utils.readBytes(key.duplicate()), 
StandardCharsets.UTF_8), offset);
+        }
+        
+        @Override
+        public long get(ByteBuffer key) {
+            return map.getOrDefault(new 
String(Utils.readBytes(key.duplicate()), StandardCharsets.UTF_8), -1L);
+        }
+        
+        @Override
+        public void updateLatestOffset(long offset) {
+            latestOff = offset;
+        }
+        
+        @Override
+        public void clear() {
+            map.clear();
+        }
+        
+        @Override
+        public int size() {
+            return map.size();
+        }
+        
+        @Override
+        public long latestOffset() {
+            return latestOff;
+        }
+    } 
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index 1dc4134c490..a0a207c512a 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.KafkaStorageException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
@@ -59,6 +61,7 @@ import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.Scheduler;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.utils.Throttler;
 import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 import org.apache.kafka.test.TestUtils;
@@ -77,14 +80,17 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.security.DigestException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
@@ -93,6 +99,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -142,7 +149,7 @@ public class UnifiedLogTest {
 
     @Test
     public void shouldApplyEpochToMessageOnAppendIfLeader() throws IOException 
{
-        SimpleRecord[] records = java.util.stream.IntStream.range(0, 50)
+        SimpleRecord[] records = IntStream.range(0, 50)
             .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
             .toArray(SimpleRecord[]::new);
 
@@ -167,7 +174,7 @@ public class UnifiedLogTest {
 
     @Test
     public void 
followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache() 
throws IOException {
-        int[] messageIds = java.util.stream.IntStream.range(0, 50).toArray();
+        int[] messageIds = IntStream.range(0, 50).toArray();
         SimpleRecord[] records = Arrays.stream(messageIds)
             .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes()))
             .toArray(SimpleRecord[]::new);
@@ -1632,6 +1639,709 @@ public class UnifiedLogTest {
         assertEquals(3, log.logStartOffset());
     }
 
+    @Test
+    public void testRetentionIdempotency() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .retentionBytes(-1)
+                .retentionMs(900)
+                .fileDeleteDelayMs(0)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes()))), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "b".getBytes()))), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes()))), 0);
+
+        mockTime.sleep(901);
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.maybeIncrementLogStartOffset(1L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertEquals(
+                2, 
+                log.deleteOldSegments(),
+                "Expecting two segment deletions as log start offset retention 
should unblock time based retention"
+        );
+        assertEquals(0, log.deleteOldSegments());
+    }
+
+    @Test
+    public void testLogStartOffsetMovementDeletesSnapshots() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .retentionBytes(-1)
+                .fileDeleteDelayMs(0)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid1 = 1L;
+        short epoch = 0;
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes())), pid1, epoch, 0, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("b".getBytes())), pid1, epoch, 1, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("c".getBytes())), pid1, epoch, 2, 0L), 0);
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+        // Increment the log start offset to exclude the first two segments.
+        log.maybeIncrementLogStartOffset(log.logEndOffset() - 1, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        // Sleep to breach the file delete delay and run scheduled file 
deletion tasks
+        mockTime.sleep(1);
+        assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size(),
+                "expect a single producer state snapshot remaining");
+    }
+
+    @Test
+    public void testCompactionDeletesProducerStateSnapshots() throws 
IOException, DigestException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+                .fileDeleteDelayMs(0)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid1 = 1L;
+        short epoch = 0;
+
+        OffsetMap fakeOffsetMap = new LogTestUtils.FakeOffsetMap();
+
+        Cleaner cleaner = new Cleaner(0, fakeOffsetMap, 64 * 1024, 64 * 1024, 
0.75,
+                new Throttler(Double.MAX_VALUE, Long.MAX_VALUE, "throttler", 
"entries", mockTime),
+                mockTime, tp -> { });
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes(), "a".getBytes())), pid1, epoch, 0, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes(), "b".getBytes())), pid1, epoch, 1, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes(), "c".getBytes())), pid1, epoch, 2, 0L), 0);
+        log.updateHighWatermark(log.logEndOffset());
+
+        List<Long> expectedSnapshotOffsets = log.logSegments()
+                .stream()
+                .map(LogSegment::baseOffset)
+                .sorted()
+                .skip(1)
+                .collect(Collectors.toList());
+        List<Long> snapshotOffsets = 
ProducerStateManager.listSnapshotFiles(logDir)
+                .stream()
+                .map(f -> f.offset)
+                .sorted()
+                .collect(Collectors.toList());
+        assertEquals(
+                expectedSnapshotOffsets, 
+                snapshotOffsets,
+                "expected a snapshot file per segment base offset, except the 
first segment"
+        );
+        assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+        // Clean segments, this should delete everything except the active 
segment since there only
+        // exists the key "a".
+        cleaner.clean(new LogToClean(log, 0, log.logEndOffset(), false));
+        // There is no other key so we don't delete anything
+        assertEquals(0, log.deleteOldSegments());
+        // Sleep to breach the file delete delay and run scheduled file 
deletion tasks
+        mockTime.sleep(1);
+
+        List<Long> expectedSnapshotOffsets2 = log.logSegments().stream()
+                
.map(LogSegment::baseOffset).sorted().skip(1).collect(Collectors.toList());
+        List<Long> snapshotOffsets2 = 
ProducerStateManager.listSnapshotFiles(logDir).stream()
+                .map(f -> f.offset).sorted().collect(Collectors.toList());
+        assertEquals(expectedSnapshotOffsets2, snapshotOffsets2,
+                "expected a snapshot file per segment base offset, excluding 
the first");
+    }
+
+    /**
+     * After loading the log, producer state is truncated such that there are 
no producer state snapshot files which
+     * exceed the log end offset. This test verifies that these are removed.
+     */
+    @Test
+    public void testLoadingLogDeletesProducerStateSnapshotsPastLogEndOffset() 
throws IOException {
+        Files.createFile(LogFileUtils.producerSnapshotFile(logDir, 
42).toPath());
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .retentionBytes(-1)
+                .fileDeleteDelayMs(0)
+                .build();
+        createLog(logDir, logConfig);
+        assertEquals(0, ProducerStateManager.listSnapshotFiles(logDir).size(),
+                "expected producer state snapshots greater than the log end 
offset to be cleaned up");
+    }
+
+    @Test
+    public void testProducerIdMapTruncateFullyAndStartAt() throws IOException {
+        MemoryRecords records = singletonRecords("foo".getBytes());
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.sizeInBytes())
+                .retentionBytes((long) records.sizeInBytes() * 2)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(records, 0);
+        log.takeProducerSnapshot();
+
+        log.appendAsLeader(singletonRecords("bar".getBytes()), 0);
+        log.appendAsLeader(singletonRecords("baz".getBytes()), 0);
+        log.takeProducerSnapshot();
+
+        assertEquals(3, log.logSegments().size());
+        assertEquals(3, log.latestProducerStateEndOffset());
+        assertEquals(OptionalLong.of(3), log.latestProducerSnapshotOffset());
+
+        log.truncateFullyAndStartAt(29, Optional.empty());
+        assertEquals(1, log.logSegments().size());
+        assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset());
+        assertEquals(29, log.latestProducerStateEndOffset());
+    }
+
+    @Test
+    public void testProducerIdExpirationOnSegmentDeletion() throws IOException 
{
+        long pid1 = 1L;
+        short epoch = 0;
+        MemoryRecords records = LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes())), pid1, epoch, 0, 0L);
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.sizeInBytes())
+                .retentionBytes((long) records.sizeInBytes() * 2)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(records, 0);
+        log.takeProducerSnapshot();
+
+        long pid2 = 2L;
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("bar".getBytes())), pid2, epoch, 0, 0L), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("baz".getBytes())), pid2, epoch, 1, 0L), 0);
+        log.takeProducerSnapshot();
+
+        assertEquals(3, log.logSegments().size());
+        assertEquals(Set.of(pid1, pid2), 
log.activeProducersWithLastSequence().keySet());
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+
+        // Producer state should not be removed when deleting log segment
+        assertEquals(2, log.logSegments().size());
+        assertEquals(Set.of(pid1, pid2), 
log.activeProducersWithLastSequence().keySet());
+    }
+
+    @Test
+    public void 
testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(singletonRecords("a".getBytes()), 0);
+        log.roll(Optional.of(1L));
+        assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+        assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+        log.appendAsLeader(singletonRecords("b".getBytes()), 0);
+        log.roll(Optional.of(2L));
+        assertEquals(OptionalLong.of(2L), log.latestProducerSnapshotOffset());
+        assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+        log.appendAsLeader(singletonRecords("c".getBytes()), 0);
+        log.roll(Optional.of(3L));
+        assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset());
+
+        // roll triggers a flush at the starting offset of the new segment, we 
should retain all snapshots
+        assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+        // even if we flush within the active segment, the snapshot should 
remain
+        log.appendAsLeader(singletonRecords("baz".getBytes()), 0);
+        log.flushUptoOffsetExclusive(4L);
+        assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset());
+    }
+
+    @Test
+    public void testProducerSnapshotAfterSegmentRollOnAppend() throws 
IOException {
+        long producerId = 1L;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), new 
byte[512])),
+                producerId, (short) 0, 0, 0L), 0);
+
+        // The next append should overflow the segment and cause it to roll
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), new 
byte[512])),
+                producerId, (short) 0, 1, 0L), 0);
+
+        assertEquals(2, log.logSegments().size());
+        assertEquals(1L, log.activeSegment().baseOffset());
+        assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+
+        // Force a reload from the snapshot to check its consistency
+        log.truncateTo(1L);
+
+        assertEquals(2, log.logSegments().size());
+        assertEquals(1L, log.activeSegment().baseOffset());
+        assertFalse(log.activeSegment().log().batches().iterator().hasNext());
+        assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+
+        ProducerStateEntry lastEntry = 
log.producerStateManager().lastEntry(producerId).orElse(null);
+        assertNotNull(lastEntry);
+        assertEquals(0L, lastEntry.firstDataOffset());
+        assertEquals(0L, lastEntry.lastDataOffset());
+    }
+
+    @Test
+    public void testRebuildTransactionalState() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        long pid = 137L;
+        short epoch = 5;
+        int seq = 0;
+
+        // add some transactional records
+        MemoryRecords txnRecords = 
MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq,
+                new SimpleRecord("foo".getBytes()),
+                new SimpleRecord("bar".getBytes()),
+                new SimpleRecord("baz".getBytes()));
+        log.appendAsLeader(txnRecords, 0);
+        LogAppendInfo abortAppendInfo = 
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
+                ControlRecordType.ABORT, mockTime.milliseconds(), 0, 0,
+                TransactionVersion.TV_0.featureLevel());
+        log.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+
+        // now there should be no first unstable offset
+        assertEquals(Optional.empty(), log.firstUnstableOffset());
+
+        log.close();
+
+        UnifiedLog reopenedLog = createLog(logDir, logConfig, 0L, 0L, 
brokerTopicStats,
+                mockTime.scheduler, mockTime, producerStateManagerConfig, 
false,
+                Optional.empty(), false);
+        reopenedLog.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+        assertEquals(Optional.empty(), reopenedLog.firstUnstableOffset());
+    }
+
+    @Test
+    public void testPeriodicProducerIdExpiration() throws IOException {
+        ProducerStateManagerConfig customConfig = new 
ProducerStateManagerConfig(200, false);
+        int producerIdExpirationCheckIntervalMs = 100;
+
+        long pid = 23L;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, 0L, 0L, brokerTopicStats,
+                mockTime.scheduler, mockTime, customConfig, true, 
Optional.empty(), false,
+                producerIdExpirationCheckIntervalMs);
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"foo".getBytes())),
+                pid, (short) 0, 0, 0L), 0);
+
+        assertEquals(Set.of(pid), 
log.activeProducersWithLastSequence().keySet());
+
+        mockTime.sleep(producerIdExpirationCheckIntervalMs);
+        assertEquals(Set.of(pid), 
log.activeProducersWithLastSequence().keySet());
+
+        mockTime.sleep(producerIdExpirationCheckIntervalMs);
+        assertEquals(Set.of(), log.activeProducersWithLastSequence().keySet());
+    }
+
+    @Test
+    public void testDuplicateAppends() throws IOException {
+        UnifiedLog log = createLog(logDir, new 
LogTestUtils.LogConfigBuilder().build());
+        long pid = 1L;
+        short epoch = 0;
+
+        int[] seq = {0};
+        // Pad the beginning of the log.
+        for (int i = 0; i <= 5; i++) {
+            MemoryRecords record = LogTestUtils.records(
+                    List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                    pid, epoch, seq[0], 0L);
+            log.appendAsLeader(record, 0);
+            seq[0]++;
+        }
+        // Append an entry with multiple log records.
+        Supplier<MemoryRecords> createRecords = () -> 
LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
+        ), pid, epoch, seq[0], 0L);
+        LogAppendInfo multiEntryAppendInfo = 
log.appendAsLeader(createRecords.get(), 0);
+        assertEquals(3, multiEntryAppendInfo.lastOffset() - 
multiEntryAppendInfo.firstOffset() + 1,
+                "should have appended 3 entries");
+
+        // Append a Duplicate of the tail, when the entry at the tail has 
multiple records.
+        LogAppendInfo dupMultiEntryAppendInfo = 
log.appendAsLeader(createRecords.get(), 0);
+        assertEquals(multiEntryAppendInfo.firstOffset(), 
dupMultiEntryAppendInfo.firstOffset(),
+                "Somehow appended a duplicate entry with multiple log records 
to the tail");
+        assertEquals(multiEntryAppendInfo.lastOffset(), 
dupMultiEntryAppendInfo.lastOffset(),
+                "Somehow appended a duplicate entry with multiple log records 
to the tail");
+
+        seq[0] += 3;
+
+        // Append a partial duplicate of the tail. This is not allowed.
+        MemoryRecords partialDup = LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
+        ), pid, epoch, seq[0] - 2, 0L);
+        assertThrows(OutOfOrderSequenceException.class, () -> 
log.appendAsLeader(partialDup, 0),
+                () -> "Should have received an OutOfOrderSequenceException 
since we attempted to append a duplicate of a records in the middle of the 
log.");
+
+        // Append a duplicate of the batch which is 4th from the tail. This 
should succeed without error since we
+        // retain the batch metadata of the last 5 batches.
+        MemoryRecords duplicateOfFourth = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                pid, epoch, 2, 0L);
+        log.appendAsLeader(duplicateOfFourth, 0);
+
+        // Duplicates at older entries are reported as OutOfOrderSequence 
errors
+        MemoryRecords oldDup = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key-1".getBytes(), "value-1".getBytes())),
+                pid, epoch, 1, 0L);
+        assertThrows(OutOfOrderSequenceException.class, () -> 
log.appendAsLeader(oldDup, 0),
+                () -> "Should have received an OutOfOrderSequenceException 
since we attempted to append a duplicate of a batch which is older than the 
last 5 appended batches.");
+
+        // Append a duplicate entry with a single records at the tail of the 
log. This should return the appendInfo of the original entry.
+        Supplier<MemoryRecords> createRecordsWithDuplicate = () -> 
LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                pid, epoch, seq[0], 0L);
+        LogAppendInfo origAppendInfo = 
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
+        LogAppendInfo newAppendInfo = 
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
+        assertEquals(origAppendInfo.firstOffset(), 
newAppendInfo.firstOffset(), "Inserted a duplicate records into the log");
+        assertEquals(origAppendInfo.lastOffset(), newAppendInfo.lastOffset(), 
"Inserted a duplicate records into the log");
+    }
+
+    @Test
+    public void testMultipleProducerIdsPerMemoryRecord() throws IOException {
+        UnifiedLog log = createLog(logDir, new 
LogTestUtils.LogConfigBuilder().build());
+
+        short producerEpoch = 0;
+        int partitionLeaderEpoch = 0;
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(
+                buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), 
1L, producerEpoch, 0, false,
+                partitionLeaderEpoch);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), 
2L, producerEpoch, 0, false,
+                partitionLeaderEpoch);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        builder = MemoryRecords.builder(
+                buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), 
3L, producerEpoch, 0, false,
+                partitionLeaderEpoch);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        builder = MemoryRecords.builder(
+                buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), 
4L, producerEpoch, 0, false,
+                partitionLeaderEpoch);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        buffer.flip();
+        MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+
+        log.appendAsFollower(memoryRecords, partitionLeaderEpoch);
+        log.flush(false);
+
+        FetchDataInfo fetchedData = log.read(0, Integer.MAX_VALUE, 
FetchIsolation.LOG_END, true);
+
+        Iterator<? extends RecordBatch> origIterator = 
memoryRecords.batches().iterator();
+        for (RecordBatch batch : fetchedData.records.batches()) {
+            assertTrue(origIterator.hasNext());
+            RecordBatch origEntry = origIterator.next();
+            assertEquals(origEntry.producerId(), batch.producerId());
+            assertEquals(origEntry.baseOffset(), batch.baseOffset());
+            assertEquals(origEntry.baseSequence(), batch.baseSequence());
+        }
+    }
+
+    @Test
+    public void testDuplicateAppendToFollower() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        short producerEpoch = 0;
+        long pid = 1L;
+        int baseSequence = 0;
+        int partitionLeaderEpoch = 0;
+        // The point of this test is to ensure that validation isn't performed 
on the follower.
+        // this is a bit contrived. to trigger the duplicate case for a 
follower append, we have to append
+        // a batch with matching sequence numbers, but valid increasing offsets
+        assertEquals(0L, log.logEndOffset());
+        log.appendAsFollower(
+                MemoryRecords.withIdempotentRecords(0L, Compression.NONE, pid, 
producerEpoch, baseSequence,
+                        partitionLeaderEpoch, new 
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())),
+                partitionLeaderEpoch);
+        log.appendAsFollower(
+                MemoryRecords.withIdempotentRecords(2L, Compression.NONE, pid, 
producerEpoch, baseSequence,
+                        partitionLeaderEpoch, new 
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())),
+                partitionLeaderEpoch);
+
+        // Ensure that even the duplicate sequences are accepted on the 
follower.
+        assertEquals(4L, log.logEndOffset());
+    }
+
+    @Test
+    public void testMultipleProducersWithDuplicatesInSingleAppend() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        long pid1 = 1L;
+        long pid2 = 2L;
+        short producerEpoch = 0;
+
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+
+        // pid1 seq = 0
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), 
pid1, producerEpoch, 0);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        // pid2 seq = 0
+        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), 
pid2, producerEpoch, 0);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        // pid1 seq = 1
+        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), 
pid1, producerEpoch, 1);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        // pid2 seq = 1
+        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), 
pid2, producerEpoch, 1);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        // pid1 seq = 1 (duplicate)
+        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 4L, mockTime.milliseconds(), 
pid1, producerEpoch, 1);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        buffer.flip();
+
+        int epoch = 0;
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        records.batches().forEach(b -> b.setPartitionLeaderEpoch(epoch));
+
+        // Ensure that batches with duplicates are accepted on the follower.
+        assertEquals(0L, log.logEndOffset());
+        log.appendAsFollower(records, epoch);
+        assertEquals(5L, log.logEndOffset());
+    }
+
+    @Test
+    public void testOldProducerEpoch() throws IOException {
+        UnifiedLog log = createLog(logDir, new 
LogTestUtils.LogConfigBuilder().build());
+        long pid = 1L;
+        short newEpoch = 1;
+        short oldEpoch = 0;
+
+        MemoryRecords records = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                pid, newEpoch, 0, 0L);
+        log.appendAsLeader(records, 0);
+
+        MemoryRecords nextRecords = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                pid, oldEpoch, 0, 0L);
+        assertThrows(InvalidProducerEpochException.class, () -> 
log.appendAsLeader(nextRecords, 0));
+    }
+
+    @Test
+    public void testDeleteSnapshotsOnIncrementLogStartOffset() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid1 = 1L;
+        long pid2 = 2L;
+        short epoch = 0;
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes())), pid1, epoch, 0, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "b".getBytes())), pid2, epoch, 0, 0L), 0);
+        log.roll();
+
+        assertEquals(2, log.activeProducersWithLastSequence().size());
+        assertEquals(2, 
ProducerStateManager.listSnapshotFiles(log.dir()).size());
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.maybeIncrementLogStartOffset(2L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        // force retention to kick in so that the snapshot files are cleaned 
up.
+        mockTime.sleep(logConfig.fileDeleteDelayMs + 1000); // advance the 
clock so file deletion takes place
+
+        // Deleting records should not remove producer state but should delete 
snapshots after the file deletion delay.
+        assertEquals(2, log.activeProducersWithLastSequence().size());
+        assertEquals(1, 
ProducerStateManager.listSnapshotFiles(log.dir()).size());
+        int retainedLastSeq = log.activeProducersWithLastSequence().get(pid2);
+        assertEquals(0, retainedLastSeq);
+    }
+
+    /**
+     * Test for jitter for time based log roll. This test appends messages 
then changes the time
+     * using the mock clock to force the log to roll and checks the number of 
segments.
+     */
+    @Test
+    public void testTimeBasedLogRollJitter() throws IOException {
+        MemoryRecords set = singletonRecords("test".getBytes(), 
mockTime.milliseconds());
+        long maxJitter = 20 * 60L;
+        // create a log
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentMs(ONE_HOUR)
+                .segmentJitterMs(maxJitter)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "Log begins with a single 
empty segment.");
+        log.appendAsLeader(set, 0);
+
+        mockTime.sleep(log.config().segmentMs - maxJitter);
+        set = singletonRecords("test".getBytes(), mockTime.milliseconds());
+        log.appendAsLeader(set, 0);
+        assertEquals(1, log.numberOfSegments(),
+                "Log does not roll on this append because it occurs earlier 
than max jitter");
+        mockTime.sleep(maxJitter - log.activeSegment().rollJitterMs() + 1);
+        set = singletonRecords("test".getBytes(), mockTime.milliseconds());
+        log.appendAsLeader(set, 0);
+        assertEquals(2, log.numberOfSegments(),
+                "Log should roll after segmentMs adjusted by random jitter");
+    }
+
+    /**
+     * Test that appending more than the maximum segment size rolls the log
+     */
+    @Test
+    public void testSizeBasedLogRoll() throws IOException {
+        MemoryRecords createRecords = singletonRecords("test".getBytes(), 
mockTime.milliseconds());
+        int setSize = createRecords.sizeInBytes();
+        int msgPerSeg = 10;
+        int segmentSize = msgPerSeg * (setSize - 1); // each segment will be 
10 messages
+        // create a log
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+
+        // segments expire in size
+        for (int i = 0; i <= msgPerSeg; i++) {
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds()), 0);
+        }
+        assertEquals(2, log.numberOfSegments(), "There should be exactly 2 
segments.");
+    }
+
+    /**
+     * Test that we can open and append to an empty log
+     */
+    @Test
+    public void testLoadEmptyLog() throws IOException {
+        Files.createFile(LogFileUtils.logFile(logDir, 0).toPath());
+        Files.createFile(LogFileUtils.offsetIndexFile(logDir, 0).toPath());
+        UnifiedLog log = createLog(logDir, new 
LogTestUtils.LogConfigBuilder().build());
+        log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds()), 0);
+    }
+
+    /**
+     * This test case appends a bunch of messages and checks that we can read 
them all back using sequential offsets.
+     */
+    @Test
+    public void testAppendAndReadWithSequentialOffsets() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(71).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        byte[][] values = IntStream.iterate(0, i -> i + 2).limit(50)
+                .mapToObj(id -> String.valueOf(id).getBytes())
+                .toArray(byte[][]::new);
+
+        for (byte[] value : values) {
+            log.appendAsLeader(singletonRecords(value), 0);
+        }
+
+        for (int i = 0; i < values.length; i++) {
+            RecordBatch read = log.read(i, 1, FetchIsolation.LOG_END, 
true).records.batches().iterator().next();
+            assertEquals(i, read.lastOffset(), "Offset read should match order 
appended.");
+            Record actual = read.iterator().next();
+            assertNull(actual.key(), "Key should be null");
+            assertEquals(ByteBuffer.wrap(values[i]), actual.value(), "Values 
not equal");
+        }
+        long count = 0;
+        for (RecordBatch batch : log.read(values.length, 100, 
FetchIsolation.LOG_END, true).records.batches()) {
+            assertNotNull(batch);
+            count++;
+        }
+        assertEquals(0, count, "Reading beyond the last message returns 
nothing.");
+    }
+
+    /**
+     * This test appends a bunch of messages with non-sequential offsets and 
checks that we can read the correct message
+     * from any offset less than the logEndOffset including offsets not 
appended.
+     */
+    @Test
+    public void testAppendAndReadWithNonSequentialOffsets() throws IOException 
{
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(72).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        int[] seqPart = IntStream.range(0, 50).toArray();
+        int[] nonSeqPart = IntStream.iterate(50, n -> n + 
7).limit(22).toArray(); // 50,57,...,197
+        // combined message ids
+        int[] messageIds = new int[seqPart.length + nonSeqPart.length];
+        System.arraycopy(seqPart, 0, messageIds, 0, seqPart.length);
+        System.arraycopy(nonSeqPart, 0, messageIds, seqPart.length, 
nonSeqPart.length);
+        SimpleRecord[] records = Arrays.stream(messageIds)
+                .mapToObj(id -> new 
SimpleRecord(String.valueOf(id).getBytes()))
+                .toArray(SimpleRecord[]::new);
+
+        // now test the case that we give the offsets and use non-sequential 
offsets
+        for (int i = 0; i < records.length; i++) {
+            log.appendAsFollower(
+                    MemoryRecords.withRecords(messageIds[i], Compression.NONE, 
0, records[i]),
+                    Integer.MAX_VALUE);
+        }
+
+        int maxId = Arrays.stream(messageIds).max().getAsInt();
+        for (int i = 50; i < maxId; i++) {
+            final int offset = i;
+            int idx = 0;
+            while (idx < messageIds.length && messageIds[idx] < offset) idx++;
+            Record read = log.read(i, 100, FetchIsolation.LOG_END, 
true).records.records().iterator().next();
+            assertEquals(messageIds[idx], read.offset(), "Offset read should 
match message id.");
+            assertEquals(records[idx], new SimpleRecord(read), "Message should 
match appended.");
+        }
+    }
+
+    /**
+     * This test covers an odd case where we have a gap in the offsets that 
falls at the end of a log segment.
+     * Specifically we create a log where the last message in the first 
segment has offset 0. If we
+     * then read offset 1, we should expect this read to come from the second 
segment, even though the
+     * first segment has the greatest lower bound on the offset.
+     */
+    @Test
+    public void testReadAtLogGap() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(300).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // keep appending until we have two segments with only a single 
message in the second segment
+        while (log.numberOfSegments() == 1) {
+            log.appendAsLeader(singletonRecords("42".getBytes()), 0);
+        }
+
+        // now manually truncate off all but one message from the first 
segment to create a gap in the messages
+        log.logSegments().get(0).truncateTo(1);
+
+        assertEquals(log.logEndOffset() - 1,
+                log.read(1, 200, FetchIsolation.LOG_END, 
true).records.batches().iterator().next().lastOffset(),
+                "A read should now return the last message in the log");
+    }
+
+    @Test
+    public void testLogRollAfterLogHandlerClosed() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.closeHandlers();
+        assertThrows(KafkaStorageException.class, () -> 
log.roll(Optional.of(1L)));
+    }
+
     private void assertValidLogOffsetMetadata(UnifiedLog log, 
LogOffsetMetadata offsetMetadata) throws IOException {
         assertFalse(offsetMetadata.messageOffsetOnly());
 
@@ -1693,6 +2403,24 @@ public class UnifiedLogTest {
             boolean lastShutdownClean,
             Optional<Uuid> topicId,
             boolean remoteStorageSystemEnable) throws IOException {
+        return createLog(dir, config, logStartOffset, recoveryPoint, 
brokerTopicStats, scheduler, time,
+                producerStateManagerConfig, lastShutdownClean, topicId, 
remoteStorageSystemEnable,
+                
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT);
+    }
+
+    private UnifiedLog createLog(
+            File dir,
+            LogConfig config,
+            long logStartOffset,
+            long recoveryPoint,
+            BrokerTopicStats brokerTopicStats,
+            Scheduler scheduler,
+            MockTime time,
+            ProducerStateManagerConfig producerStateManagerConfig,
+            boolean lastShutdownClean,
+            Optional<Uuid> topicId,
+            boolean remoteStorageSystemEnable,
+            int producerIdExpirationCheckIntervalMs) throws IOException {
 
         UnifiedLog log = UnifiedLog.create(
                 dir,
@@ -1704,7 +2432,7 @@ public class UnifiedLogTest {
                 time,
                 3600000,
                 producerStateManagerConfig,
-                
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+                producerIdExpirationCheckIntervalMs,
                 new LogDirFailureChannel(10),
                 lastShutdownClean,
                 topicId,

Reply via email to