This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e3e6858d3f KAFKA-19752 Move parts of UnifiedLogTest to storage module 
(#21912)
4e3e6858d3f is described below

commit 4e3e6858d3f689c3610e8f786c08fce8a1679747
Author: Ken Huang <[email protected]>
AuthorDate: Wed Apr 1 18:28:40 2026 +0800

    KAFKA-19752 Move parts of UnifiedLogTest to storage module (#21912)
    
    testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure ~
    testStaleProducerEpochReturnsRecoverableErrorForTV2Clients
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 619 ---------------------
 .../storage/internals/log/UnifiedLogTest.java      | 551 +++++++++++++++++-
 2 files changed, 536 insertions(+), 634 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
deleted file mode 100755
index 69c3e3efb69..00000000000
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ /dev/null
@@ -1,619 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors._
-import org.apache.kafka.common.record.internal._
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import org.apache.kafka.server.log.remote.storage.RemoteLogManager
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.util.{MockTime, Scheduler}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, 
OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.{doThrow, spy}
-
-import java.io._
-import java.nio.file.Files
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-import java.util.Optional
-import scala.collection.mutable.ListBuffer
-import scala.jdk.CollectionConverters._
-
-class UnifiedLogTest {
-  var config: KafkaConfig = _
-  val brokerTopicStats = new BrokerTopicStats
-  val tmpDir = TestUtils.tempDir()
-  val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-  val mockTime = new MockTime()
-  var logsToClose: Seq[UnifiedLog] = Seq()
-  val producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false)
-  def metricsKeySet = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
-
-  @BeforeEach
-  def setUp(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, port = -1)
-    config = KafkaConfig.fromProps(props)
-  }
-
-  @AfterEach
-  def tearDown(): Unit = {
-    brokerTopicStats.close()
-    logsToClose.foreach(l => Utils.closeQuietly(l, "UnifiedLog"))
-    Utils.delete(tmpDir)
-  }
-
-  def createEmptyLogs(dir: File, offsets: Int*): Unit = {
-    for (offset <- offsets) {
-      Files.createFile(LogFileUtils.logFile(dir, offset).toPath)
-      Files.createFile(LogFileUtils.offsetIndexFile(dir, offset).toPath)
-    }
-  }
-
-  @Test
-  def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): 
Unit = {
-    val logConfig = LogTestUtils.createLogConfig()
-    val log = spy(createLog(logDir, logConfig))
-
-    doThrow(new KafkaStorageException("Injected 
exception")).when(log).flushProducerStateSnapshot(any())
-
-    log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), 0)
-    try {
-      log.roll(Optional.of(1L))
-    } catch {
-      case _: KafkaStorageException => // ignore
-    }
-
-    // check that the recovery point isn't incremented
-    assertEquals(0L, log.recoveryPoint)
-  }
-
-  @Test
-  def testDeletableSegmentsFilter(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-    for (_ <- 0 to 8) {
-      val records = TestUtils.records(List(
-        new SimpleRecord(mockTime.milliseconds, "a".getBytes),
-      ))
-      log.appendAsLeader(records, 0)
-      log.roll()
-    }
-    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
-
-    assertEquals(10, log.logSegments.size())
-
-    {
-      val deletable = log.deletableSegments(
-        (segment: LogSegment, _: Optional[LogSegment]) => segment.baseOffset 
<= 5)
-      val expected = log.nonActiveLogSegmentsFrom(0L).stream().filter(segment 
=> segment.baseOffset <= 5).toList
-      assertEquals(6, expected.size)
-      assertEquals(expected, deletable)
-    }
-
-    {
-      val deletable = log.deletableSegments((_: LogSegment, _: 
Optional[LogSegment]) => true)
-      val expected = log.nonActiveLogSegmentsFrom(0L).stream().toList
-      assertEquals(9, expected.size)
-      assertEquals(expected, deletable)
-    }
-
-    {
-      val records = TestUtils.records(List(
-        new SimpleRecord(mockTime.milliseconds, "a".getBytes),
-      ))
-      log.appendAsLeader(records, 0)
-      log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
-      val deletable = log.deletableSegments((_: LogSegment, _: 
Optional[LogSegment]) => true)
-      val expected = log.logSegments.stream().toList
-      assertEquals(10, expected.size)
-      assertEquals(expected, deletable)
-    }
-  }
-
-  @Test
-  def testDeletableSegmentsIteration(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-    for (_ <- 0 to 8) {
-      val records = TestUtils.records(List(
-        new SimpleRecord(mockTime.milliseconds, "a".getBytes),
-      ))
-      log.appendAsLeader(records, 0)
-      log.roll()
-    }
-    log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
-
-    assertEquals(10, log.logSegments.size())
-
-    var offset = 0
-    val deletableSegments = log.deletableSegments(
-      (segment: LogSegment, nextSegmentOpt: Optional[LogSegment]) => {
-        assertEquals(offset, segment.baseOffset)
-        val logSegments = new LogSegments(log.topicPartition)
-        log.logSegments.forEach(segment => logSegments.add(segment))
-        val floorSegmentOpt = logSegments.floorSegment(offset)
-        assertTrue(floorSegmentOpt.isPresent)
-        assertEquals(floorSegmentOpt.get, segment)
-        if (offset == log.logEndOffset) {
-          assertFalse(nextSegmentOpt.isPresent)
-        } else {
-          assertTrue(nextSegmentOpt.isPresent)
-          val higherSegmentOpt = logSegments.higherSegment(segment.baseOffset)
-          assertTrue(higherSegmentOpt.isPresent)
-          assertEquals(segment.baseOffset + 1, higherSegmentOpt.get.baseOffset)
-          assertEquals(higherSegmentOpt.get, nextSegmentOpt.get)
-        }
-        offset += 1
-        true
-      })
-    assertEquals(10L, log.logSegments.size())
-    assertEquals(log.nonActiveLogSegmentsFrom(0L).stream.toList, 
deletableSegments)
-  }
-
-  @Test
-  def testActiveSegmentDeletionDueToRetentionTimeBreachWithRemoteStorage(): 
Unit = {
-    val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, 
segmentIndexBytes = 12,
-      retentionMs = 3, localRetentionMs = 1, fileDeleteDelayMs = 0, 
remoteLogStorageEnable = true)
-    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
-    // Append 1 message to the active segment
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes))),
-      0)
-    // Update the highWatermark so that these segments will be eligible for 
deletion.
-    log.updateHighWatermark(log.logEndOffset)
-    assertEquals(1, log.logSegments.size)
-    assertEquals(0, log.activeSegment.baseOffset())
-
-    mockTime.sleep(2)
-    // It should have rolled the active segment as they are eligible for 
deletion
-    assertEquals(0, log.deleteOldSegments())
-    assertEquals(2, log.logSegments.size)
-    log.logSegments.asScala.zipWithIndex.foreach {
-      case (segment, idx) => assertEquals(idx, segment.baseOffset)
-    }
-
-    // Once rolled, the segment should be uploaded to remote storage and 
eligible for deletion
-    log.updateHighestOffsetInRemoteStorage(1)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertEquals(1, log.logSegments.size)
-    assertEquals(1, log.logSegments.asScala.head.baseOffset())
-    assertEquals(1, log.localLogStartOffset())
-    assertEquals(1, log.logEndOffset)
-    assertEquals(0, log.logStartOffset)
-  }
-
-  @Test
-  def 
testSegmentDeletionEnabledBeforeUploadToRemoteTierWhenLogStartOffsetMovedAhead():
 Unit = {
-    val logConfig = LogTestUtils.createLogConfig(retentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
-    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-    val pid = 1L
-    val epoch = 0.toShort
-
-    assertTrue(log.isEmpty)
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)),
-      producerId = pid, producerEpoch = epoch, sequence = 0), 0)
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)),
-      producerId = pid, producerEpoch = epoch, sequence = 1), 0)
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)),
-      producerId = pid, producerEpoch = epoch, sequence = 2), 0)
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("d".getBytes)),
-      producerId = pid, producerEpoch = epoch, sequence = 3), 1)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("e".getBytes)),
-      producerId = pid, producerEpoch = epoch, sequence = 4), 2)
-    log.updateHighWatermark(log.logEndOffset)
-    assertEquals(2, log.logSegments.size)
-
-    // No segments are uploaded to remote storage, none of the local log 
segments should be eligible for deletion
-    log.updateHighestOffsetInRemoteStorage(-1L)
-    assertEquals(0, log.deleteOldSegments())
-    mockTime.sleep(1)
-    assertEquals(2, log.logSegments.size)
-    assertFalse(log.isEmpty)
-
-    // Update the log-start-offset from 0 to 3, then the base segment should 
not be eligible for deletion
-    log.updateLogStartOffsetFromRemoteTier(3L)
-    assertEquals(0, log.deleteOldSegments())
-    mockTime.sleep(1)
-    assertEquals(2, log.logSegments.size)
-    assertFalse(log.isEmpty)
-
-    // Update the log-start-offset from 3 to 4, then the base segment should 
be eligible for deletion now even
-    // if it is not uploaded to remote storage
-    log.updateLogStartOffsetFromRemoteTier(4L)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    mockTime.sleep(1)
-    assertEquals(1, log.logSegments.size)
-    assertFalse(log.isEmpty)
-
-    log.updateLogStartOffsetFromRemoteTier(5L)
-    assertEquals(0, log.deleteOldSegments())
-    mockTime.sleep(1)
-    assertEquals(1, log.logSegments.size)
-    assertTrue(log.isEmpty)
-  }
-
-  @Test
-  def 
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionBytes():
 Unit = {
-    def createRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
-    val segmentBytes = createRecords.sizeInBytes()
-    val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, retentionBytes = 1,
-      fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
-    val log = createLog(logDir, retentionBytesConfig, 
remoteStorageSystemEnable = true)
-
-    // Given 6 segments of 1 message each
-    for (_ <- 0 until 6) {
-      log.appendAsLeader(createRecords, 0)
-    }
-    assertEquals(6, log.logSegments.size)
-
-    log.updateHighWatermark(log.logEndOffset)
-    // simulate calls to upload 2 segments to remote storage
-    log.updateHighestOffsetInRemoteStorage(1)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertEquals(4, log.logSegments.size())
-    assertEquals(0, log.logStartOffset)
-    assertEquals(2, log.localLogStartOffset())
-  }
-
-  @Test
-  def 
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionMs():
 Unit = {
-    def createRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
-    val segmentBytes = createRecords.sizeInBytes()
-    val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, retentionMs = 1000,
-      fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
-    val log = createLog(logDir, retentionBytesConfig, 
remoteStorageSystemEnable = true)
-
-    // Given 6 segments of 1 message each
-    for (_ <- 0 until 6) {
-      log.appendAsLeader(createRecords, 0)
-    }
-    assertEquals(6, log.logSegments.size)
-
-    log.updateHighWatermark(log.logEndOffset)
-    // simulate calls to upload 2 segments to remote storage
-    log.updateHighestOffsetInRemoteStorage(1)
-
-    mockTime.sleep(1001)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertEquals(4, log.logSegments.size())
-    assertEquals(0, log.logStartOffset)
-    assertEquals(2, log.localLogStartOffset())
-  }
-
-  @Test
-  def testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled(): Unit = {
-    def createRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
-    val segmentBytes = createRecords.sizeInBytes()
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, 
localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
-          fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
-    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
-    // Given 6 segments of 1 message each
-    for (_ <- 0 until 6) {
-      log.appendAsLeader(createRecords, 0)
-    }
-    assertEquals(6, log.logSegments.size)
-
-    log.updateHighWatermark(log.logEndOffset)
-
-    // Should not delete local log because highest remote storage offset is -1 
(default value)
-    assertEquals(0, log.deleteOldSegments())
-    assertEquals(6, log.logSegments.size())
-    assertEquals(0, log.logStartOffset)
-    assertEquals(0, log.localLogStartOffset())
-
-    // simulate calls to upload 2 segments to remote storage
-    log.updateHighestOffsetInRemoteStorage(1)
-
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertEquals(4, log.logSegments.size())
-    assertEquals(0, log.logStartOffset)
-    assertEquals(2, log.localLogStartOffset())
-
-    // add remoteCopyDisabled = true
-    val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
-      fileDeleteDelayMs = 0, remoteLogStorageEnable = true, 
remoteLogCopyDisable = true)
-    log.updateConfig(copyDisabledLogConfig)
-
-    // No local logs will be deleted even though local retention bytes is 1 
because we'll adopt retention.ms/bytes
-    // when remote.log.copy.disable = true
-    assertEquals(0, log.deleteOldSegments())
-    assertEquals(4, log.logSegments.size())
-    assertEquals(0, log.logStartOffset)
-    assertEquals(2, log.localLogStartOffset())
-
-    // simulate the remote logs are all deleted due to retention policy
-    log.updateLogStartOffsetFromRemoteTier(2)
-    assertEquals(4, log.logSegments.size())
-    assertEquals(2, log.logStartOffset)
-    assertEquals(2, log.localLogStartOffset())
-
-    // produce 3 more segments
-    for (_ <- 0 until 3) {
-      log.appendAsLeader(createRecords, 0)
-    }
-    assertEquals(7, log.logSegments.size)
-    log.updateHighWatermark(log.logEndOffset)
-
-    // try to delete local logs again, 2 segments will be deleted this time 
because we'll adopt retention.ms/bytes (retention.bytes = 5)
-    // when remote.log.copy.disable = true
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertEquals(5, log.logSegments.size())
-    assertEquals(4, log.logStartOffset)
-    assertEquals(4, log.localLogStartOffset())
-
-    // add localRetentionMs = 1, retentionMs = 1000
-    val retentionMsConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, localRetentionMs = 1, retentionMs = 1000,
-      fileDeleteDelayMs = 0, remoteLogStorageEnable = true, 
remoteLogCopyDisable = true)
-    log.updateConfig(retentionMsConfig)
-
-    // Should not delete any logs because no local logs expired using 
retention.ms = 1000
-    mockTime.sleep(10)
-    assertEquals(0, log.deleteOldSegments())
-    assertEquals(5, log.logSegments.size())
-    assertEquals(4, log.logStartOffset)
-    assertEquals(4, log.localLogStartOffset())
-
-    // Should delete all logs because all of them are expired based on 
retentionMs = 1000
-    mockTime.sleep(1000)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertEquals(1, log.logSegments.size())
-    assertEquals(9, log.logStartOffset)
-    assertEquals(9, log.localLogStartOffset())
-  }
-
-  @Test
-  def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
-    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
-    var offset = 0L
-    for(_ <- 0 until 50) {
-      val records = TestUtils.singletonRecords("test".getBytes())
-      val info = log.appendAsLeader(records, 0)
-      offset = info.lastOffset
-      if (offset != 0 && offset % 10 == 0)
-        log.roll()
-    }
-    assertEquals(5, log.logSegments.size)
-    log.updateHighWatermark(log.logEndOffset)
-    // simulate calls to upload 3 segments to remote storage
-    log.updateHighestOffsetInRemoteStorage(30)
-
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertEquals(2, log.logSegments.size())
-    assertEquals(0, log.logStartOffset)
-    assertEquals(31, log.localLogStartOffset())
-  }
-
-  @Test
-  def testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
-    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
-    var offset = 0L
-    for(_ <- 0 until 50) {
-      val records = TestUtils.singletonRecords("test".getBytes())
-      val info = log.appendAsLeader(records, 0)
-      offset = info.lastOffset
-      if (offset != 0 && offset % 10 == 0)
-        log.roll()
-    }
-    assertEquals(5, log.logSegments.size)
-    log.updateHighWatermark(log.logEndOffset)
-    // simulate calls to upload 3 segments to remote storage
-    log.updateHighestOffsetInRemoteStorage(30)
-
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    assertEquals(2, log.logSegments.size())
-    assertEquals(0, log.logStartOffset)
-    assertEquals(31, log.localLogStartOffset())
-
-    log.updateLogStartOffsetFromRemoteTier(15)
-    assertEquals(15, log.logStartOffset)
-
-    // case-1: offset is higher than the local-log-start-offset.
-    // log-start-offset < local-log-start-offset < offset-to-be-converted < 
log-end-offset
-    assertEquals(new LogOffsetMetadata(35, 31, 288), 
log.maybeConvertToOffsetMetadata(35))
-    // case-2: offset is less than the local-log-start-offset
-    // log-start-offset < offset-to-be-converted < local-log-start-offset < 
log-end-offset
-    assertEquals(new LogOffsetMetadata(29, -1L, -1), 
log.maybeConvertToOffsetMetadata(29))
-    // case-3: offset is higher than the log-end-offset
-    // log-start-offset < local-log-start-offset < log-end-offset < 
offset-to-be-converted
-    assertEquals(new LogOffsetMetadata(log.logEndOffset + 1, -1L, -1), 
log.maybeConvertToOffsetMetadata(log.logEndOffset + 1))
-    // case-4: offset is less than the log-start-offset
-    // offset-to-be-converted < log-start-offset < local-log-start-offset < 
log-end-offset
-    assertEquals(new LogOffsetMetadata(14, -1L, -1), 
log.maybeConvertToOffsetMetadata(14))
-  }
-
-  @Test
-  def testGetFirstBatchTimestampForSegments(): Unit = {
-    val log = createLog(logDir, LogTestUtils.createLogConfig())
-
-    val segments: util.List[LogSegment] = new util.ArrayList[LogSegment]()
-    val seg1 = LogTestUtils.createSegment(1, logDir, 10, Time.SYSTEM)
-    val seg2 = LogTestUtils.createSegment(2, logDir, 10, Time.SYSTEM)
-    segments.add(seg1)
-    segments.add(seg2)
-    assertEquals(Seq(Long.MaxValue, Long.MaxValue), 
log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
-
-    seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new 
SimpleRecord(1000L, "one".getBytes)))
-    seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new 
SimpleRecord(2000L, "two".getBytes)))
-    assertEquals(Seq(1000L, 2000L), 
log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
-
-    seg1.close()
-    seg2.close()
-  }
-
-  @Test
-  def testFetchOffsetByTimestampShouldReadOnlyLocalLogWhenLogIsEmpty(): Unit = 
{
-    val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
-    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-    val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), 
Optional.empty)
-    assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()), 
result)
-  }
-
-
-  private def createLog(dir: File,
-                        config: LogConfig,
-                        brokerTopicStats: BrokerTopicStats = brokerTopicStats,
-                        logStartOffset: Long = 0L,
-                        recoveryPoint: Long = 0L,
-                        scheduler: Scheduler = mockTime.scheduler,
-                        time: Time = mockTime,
-                        maxTransactionTimeoutMs: Int = 60 * 60 * 1000,
-                        producerStateManagerConfig: ProducerStateManagerConfig 
= producerStateManagerConfig,
-                        producerIdExpirationCheckIntervalMs: Int = 
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
-                        lastShutdownClean: Boolean = true,
-                        topicId: Option[Uuid] = None,
-                        remoteStorageSystemEnable: Boolean = false,
-                        remoteLogManager: Option[RemoteLogManager] = None,
-                        logOffsetsListener: LogOffsetsListener = 
LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
-    val log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, 
time, logStartOffset, recoveryPoint,
-      maxTransactionTimeoutMs, producerStateManagerConfig, 
producerIdExpirationCheckIntervalMs,
-      lastShutdownClean, topicId, new ConcurrentHashMap[String, Integer],
-      remoteStorageSystemEnable, remoteLogManager, logOffsetsListener)
-    logsToClose = logsToClose :+ log
-    log
-  }
-
-  case class TimestampAndEpoch(timestamp: Long, leaderEpoch: Int)
-
-  @Test
-  def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = {
-    // Producer epoch gets incremented (coordinator fail over, completed 
transaction, etc.)
-    // and client has stale cached epoch. Fix prevents fatal 
InvalidTxnStateException.
-    
-    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
-    
-    val producerId = 123L
-    val oldEpoch = 5.toShort
-    val newEpoch = 6.toShort
-    
-    // Step 1: Simulate a scenario where producer epoch was incremented to 
fence the producer
-    val previousRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE, producerId, newEpoch, 0,
-      new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
-    )
-    val previousGuard = log.maybeStartTransactionVerification(producerId, 0, 
newEpoch, false)  // TV1 = supportsEpochBump = false
-    log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, previousGuard,
-      TransactionVersion.TV_1.featureLevel())
-    
-    // Complete the transaction normally (commits do update producer state 
with current epoch)
-    val commitMarker = MemoryRecords.withEndTransactionMarker(
-      producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 
0)
-    )
-    log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching, VerificationGuard.SENTINEL,
-      TransactionVersion.TV_1.featureLevel())
-    
-    // Step 2: TV1 client tries to write with stale cached epoch (before 
learning about epoch increment)  
-    val staleEpochRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE, producerId, oldEpoch, 0,
-      new SimpleRecord("stale-epoch-key".getBytes, 
"stale-epoch-value".getBytes)
-    )
-    
-    // Step 3: Verify our fix - should get InvalidProducerEpochException 
(recoverable), not InvalidTxnStateException (fatal)
-    val exception = assertThrows(classOf[InvalidProducerEpochException], () => 
{
-      val staleGuard = log.maybeStartTransactionVerification(producerId, 0, 
oldEpoch, false)  
-      log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, staleGuard,
-        TransactionVersion.TV_1.featureLevel())
-     })
-     
-     // Verify the error message indicates epoch mismatch  
-     assertTrue(exception.getMessage.contains("smaller than the last seen 
epoch"))
-     assertTrue(exception.getMessage.contains(s"$oldEpoch"))
-     assertTrue(exception.getMessage.contains(s"$newEpoch"))
-  }
-
-  @Test
-  def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = {
-    // Check producer epoch FIRST - if stale, return recoverable error before 
verification checks.
-    
-    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
-    
-    val producerId = 456L
-    val originalEpoch = 3.toShort
-    val bumpedEpoch = 4.toShort
-    
-    // Step 1: Start transaction with epoch 3 (before timeout)
-    val initialRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE, producerId, originalEpoch, 0,
-      new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes)
-    )
-    val initialGuard = log.maybeStartTransactionVerification(producerId, 0, 
originalEpoch, true)  // TV2 = supportsEpochBump = true
-    log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, initialGuard,
-      TransactionVersion.TV_2.featureLevel())
-    
-    // Step 2: Coordinator times out and aborts transaction
-    // TV2 (KIP-890): Coordinator bumps epoch from 3 → 4 and sends abort 
marker with epoch 4
-    val abortMarker = MemoryRecords.withEndTransactionMarker(
-      producerId, bumpedEpoch, new 
EndTransactionMarker(ControlRecordType.ABORT, 0)
-    )
-    log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching, VerificationGuard.SENTINEL,
-      TransactionVersion.TV_2.featureLevel())
-    
-    // Step 3: TV2 transactional producer tries to append with stale epoch 
(timeout recovery scenario)
-    val staleEpochRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE, producerId, originalEpoch, 0,
-      new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes)
-    )
-    
-    // Step 4: Verify our fix works for TV2 - should get 
InvalidProducerEpochException (recoverable), not InvalidTxnStateException 
(fatal)
-    val exception = assertThrows(classOf[InvalidProducerEpochException], () => 
{
-      val staleGuard = log.maybeStartTransactionVerification(producerId, 0, 
originalEpoch, true)  // TV2 = supportsEpochBump = true
-      log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, staleGuard,
-        TransactionVersion.TV_2.featureLevel())
-     })
-     
-     // Verify the error message indicates epoch mismatch (3 < 4)
-     assertTrue(exception.getMessage.contains("smaller than the last seen 
epoch"))
-     assertTrue(exception.getMessage.contains(s"$originalEpoch"))
-     assertTrue(exception.getMessage.contains(s"$bumpedEpoch"))
-  }
-}
-
-object UnifiedLogTest {
-  def allRecords(log: UnifiedLog): List[Record] = {
-    val recordsFound = ListBuffer[Record]()
-    for (logSegment <- log.logSegments.asScala) {
-      for (batch <- logSegment.log.batches.asScala) {
-        recordsFound ++= batch.iterator().asScala
-      }
-    }
-    recordsFound.toList
-  }
-
-  def verifyRecordsInLog(log: UnifiedLog, expectedRecords: List[Record]): Unit 
= {
-    assertEquals(expectedRecords, allRecords(log))
-  }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index eb6957f13ac..8b0d8d9abe0 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -96,6 +96,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.security.DigestException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -115,6 +116,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -132,6 +134,7 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
@@ -1969,21 +1972,21 @@ public class UnifiedLogTest {
         long pid = 1L;
         short epoch = 0;
 
-        int[] seq = {0};
+        AtomicInteger seq = new AtomicInteger(0);
         // Pad the beginning of the log.
         for (int i = 0; i <= 5; i++) {
             MemoryRecords record = LogTestUtils.records(
                     List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
-                    pid, epoch, seq[0], 0L);
+                    pid, epoch, seq.get(), 0L);
             log.appendAsLeader(record, 0);
-            seq[0]++;
+            seq.incrementAndGet();
         }
         // Append an entry with multiple log records.
         Supplier<MemoryRecords> createRecords = () -> 
LogTestUtils.records(List.of(
-                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
-                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
-                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
-        ), pid, epoch, seq[0], 0L);
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq.get()).getBytes(), ("value-" + seq.get()).getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq.get()).getBytes(), ("value-" + seq.get()).getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq.get()).getBytes(), ("value-" + seq.get()).getBytes())
+        ), pid, epoch, seq.get(), 0L);
         LogAppendInfo multiEntryAppendInfo = 
log.appendAsLeader(createRecords.get(), 0);
         assertEquals(3, multiEntryAppendInfo.lastOffset() - 
multiEntryAppendInfo.firstOffset() + 1,
                 "should have appended 3 entries");
@@ -1995,13 +1998,13 @@ public class UnifiedLogTest {
         assertEquals(multiEntryAppendInfo.lastOffset(), 
dupMultiEntryAppendInfo.lastOffset(),
                 "Somehow appended a duplicate entry with multiple log records 
to the tail");
 
-        seq[0] += 3;
+        seq.addAndGet(3);
 
         // Append a partial duplicate of the tail. This is not allowed.
         MemoryRecords partialDup = LogTestUtils.records(List.of(
-                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
-                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
-        ), pid, epoch, seq[0] - 2, 0L);
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq.get()).getBytes(), ("value-" + seq.get()).getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq.get()).getBytes(), ("value-" + seq.get()).getBytes())
+        ), pid, epoch, seq.get() - 2, 0L);
         assertThrows(OutOfOrderSequenceException.class, () -> 
log.appendAsLeader(partialDup, 0),
                 () -> "Should have received an OutOfOrderSequenceException 
since we attempted to append a duplicate of a records in the middle of the 
log.");
 
@@ -2022,7 +2025,7 @@ public class UnifiedLogTest {
         // Append a duplicate entry with a single records at the tail of the 
log. This should return the appendInfo of the original entry.
         Supplier<MemoryRecords> createRecordsWithDuplicate = () -> 
LogTestUtils.records(
                 List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
-                pid, epoch, seq[0], 0L);
+                pid, epoch, seq.get(), 0L);
         LogAppendInfo origAppendInfo = 
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
         LogAppendInfo newAppendInfo = 
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
         assertEquals(origAppendInfo.firstOffset(), 
newAppendInfo.firstOffset(), "Inserted a duplicate records into the log");
@@ -4999,15 +5002,15 @@ public class UnifiedLogTest {
     }
 
     private BiConsumer<Long, Integer> appendTransactionalToBuffer(ByteBuffer 
buffer, long producerId, short producerEpoch, int leaderEpoch) {
-        int[] sequence = {0};
+        AtomicInteger sequence = new AtomicInteger(0);
         return (offset, numRecords) -> {
-            int baseSequence = sequence[0];
+            int baseSequence = sequence.get();
             MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.CREATE_TIME,
                 offset, mockTime.milliseconds(), producerId, producerEpoch, 
baseSequence, true, leaderEpoch);
             for (int seq = baseSequence; seq < baseSequence + numRecords; 
seq++) {
                 builder.append(new 
SimpleRecord(String.valueOf(seq).getBytes()));
             }
-            sequence[0] += numRecords;
+            sequence.addAndGet(numRecords);
             builder.close();
         };
     }
@@ -5494,4 +5497,522 @@ public class UnifiedLogTest {
             assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
         }
     }
+
+    @Test
+    public void 
testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+        UnifiedLog log = spy(createLog(logDir, logConfig));
+
+        doThrow(new KafkaStorageException("Injected 
exception")).when(log).flushProducerStateSnapshot(any(Path.class));
+
+        log.appendAsLeader(singletonRecords("a".getBytes()), 0);
+        assertThrows(KafkaStorageException.class, () -> 
log.roll(Optional.of(1L)));
+
+        // check that the recovery point isn't incremented
+        assertEquals(0L, log.recoveryPoint());
+    }
+
+    @Test
+    public void testDeletableSegmentsFilter() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        for (int i = 0; i <= 8; i++) {
+            MemoryRecords records = records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+            log.appendAsLeader(records, 0);
+            log.roll();
+        }
+        log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+        assertEquals(10, log.logSegments().size());
+
+        List<LogSegment> deletable = log.deletableSegments(
+                (segment, next) -> segment.baseOffset() <= 5);
+        List<LogSegment> expected = log.nonActiveLogSegmentsFrom(0L).stream()
+                .filter(segment -> segment.baseOffset() <= 5)
+                        .toList();
+        assertEquals(6, expected.size());
+        assertEquals(expected, deletable);
+
+        List<LogSegment> deletable1 = log.deletableSegments((segment, next) -> 
true);
+        List<LogSegment> expected1 = new 
ArrayList<>(log.nonActiveLogSegmentsFrom(0L));
+        assertEquals(9, expected1.size());
+        assertEquals(expected1, deletable1);
+
+        MemoryRecords records = records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+        log.appendAsLeader(records, 0);
+        log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+        List<LogSegment> deletable2 = log.deletableSegments((segment, next) -> 
true);
+        List<LogSegment> expected2 = new ArrayList<>(log.logSegments());
+        assertEquals(10, expected2.size());
+        assertEquals(expected2, deletable2);
+    }
+
+    @Test
+    public void testDeletableSegmentsIteration() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        for (int i = 0; i <= 8; i++) {
+            MemoryRecords records = records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+            log.appendAsLeader(records, 0);
+            log.roll();
+        }
+        log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+        assertEquals(10, log.logSegments().size());
+
+        AtomicInteger offset = new AtomicInteger(0);
+        List<LogSegment> deletableSegments = log.deletableSegments((segment, 
nextSegmentOpt) -> {
+            assertEquals(offset.get(), segment.baseOffset());
+            LogSegments logSegments = new LogSegments(log.topicPartition());
+            log.logSegments().forEach(logSegments::add);
+            Optional<LogSegment> floorSegmentOpt = 
logSegments.floorSegment(offset.get());
+            assertTrue(floorSegmentOpt.isPresent());
+            assertEquals(floorSegmentOpt.get(), segment);
+            if (offset.get() == log.logEndOffset()) {
+                assertFalse(nextSegmentOpt.isPresent());
+            } else {
+                assertTrue(nextSegmentOpt.isPresent());
+                Optional<LogSegment> higherSegmentOpt = 
logSegments.higherSegment(segment.baseOffset());
+                assertTrue(higherSegmentOpt.isPresent());
+                assertEquals(segment.baseOffset() + 1, 
higherSegmentOpt.get().baseOffset());
+                assertEquals(higherSegmentOpt.get(), nextSegmentOpt.get());
+            }
+            offset.addAndGet(1);
+            return true;
+        });
+        assertEquals(10L, log.logSegments().size());
+        assertEquals(new ArrayList<>(log.nonActiveLogSegmentsFrom(0L)), 
deletableSegments);
+    }
+
+    @Test
+    public void 
testActiveSegmentDeletionDueToRetentionTimeBreachWithRemoteStorage() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .indexIntervalBytes(1)
+                .segmentIndexBytes(12)
+                .retentionMs(3)
+                .localRetentionMs(1)
+                .fileDeleteDelayMs(0)
+                .remoteLogStorageEnable(true)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig, true);
+
+        // Append 1 message to the active segment
+        log.appendAsLeader(records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+        // Update the highWatermark so that these segments will be eligible 
for deletion.
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(1, log.logSegments().size());
+        assertEquals(0, log.activeSegment().baseOffset());
+
+        mockTime.sleep(2);
+        // It should have rolled the active segment as they are eligible for 
deletion
+        assertEquals(0, log.deleteOldSegments());
+        assertEquals(2, log.logSegments().size());
+        AtomicInteger idx = new AtomicInteger(0);
+        log.logSegments().forEach(segment -> assertEquals(idx.getAndAdd(1), 
segment.baseOffset()));
+
+        // Once rolled, the segment should be uploaded to remote storage and 
eligible for deletion
+        log.updateHighestOffsetInRemoteStorage(1);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(1, log.logSegments().size());
+        assertEquals(1, log.logSegments().iterator().next().baseOffset());
+        assertEquals(1, log.localLogStartOffset());
+        assertEquals(1, log.logEndOffset());
+        assertEquals(0, log.logStartOffset());
+    }
+
+    @Test
+    public void 
testSegmentDeletionEnabledBeforeUploadToRemoteTierWhenLogStartOffsetMovedAhead()
 throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .retentionBytes(1)
+                .fileDeleteDelayMs(0)
+                .remoteLogStorageEnable(true)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig, true);
+        long pid = 1L;
+        short epoch = 0;
+
+        assertTrue(log.isEmpty());
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes())), pid, epoch, 0, 0L), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("b".getBytes())), pid, epoch, 1, 0L), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("c".getBytes())), pid, epoch, 2, 0L), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("d".getBytes())), pid, epoch, 3, 0L), 1);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("e".getBytes())), pid, epoch, 4, 0L), 2);
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(2, log.logSegments().size());
+
+        // No segments are uploaded to remote storage, none of the local log 
segments should be eligible for deletion
+        log.updateHighestOffsetInRemoteStorage(-1L);
+        assertEquals(0, log.deleteOldSegments());
+        mockTime.sleep(1);
+        assertEquals(2, log.logSegments().size());
+        assertFalse(log.isEmpty());
+
+        // Update the log-start-offset from 0 to 3, then the base segment 
should not be eligible for deletion
+        log.updateLogStartOffsetFromRemoteTier(3L);
+        assertEquals(0, log.deleteOldSegments());
+        mockTime.sleep(1);
+        assertEquals(2, log.logSegments().size());
+        assertFalse(log.isEmpty());
+
+        // Update the log-start-offset from 3 to 4, then the base segment 
should be eligible for deletion now even
+        // if it is not uploaded to remote storage
+        log.updateLogStartOffsetFromRemoteTier(4L);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        mockTime.sleep(1);
+        assertEquals(1, log.logSegments().size());
+        assertFalse(log.isEmpty());
+
+        log.updateLogStartOffsetFromRemoteTier(5L);
+        assertEquals(0, log.deleteOldSegments());
+        mockTime.sleep(1);
+        assertEquals(1, log.logSegments().size());
+        assertTrue(log.isEmpty());
+    }
+
+    @Test
+    public void 
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionBytes()
 throws IOException {
+        MemoryRecords createRecords = records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+        int segmentBytes = createRecords.sizeInBytes();
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(segmentBytes)
+                .retentionBytes(1)
+                .fileDeleteDelayMs(0)
+                .remoteLogStorageEnable(true)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig, true);
+
+        // Given 6 segments of 1 message each
+        for (int i = 0; i < 6; i++) {
+            log.appendAsLeader(records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+        }
+        assertEquals(6, log.logSegments().size());
+
+        log.updateHighWatermark(log.logEndOffset());
+        // simulate calls to upload 2 segments to remote storage
+        log.updateHighestOffsetInRemoteStorage(1);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(4, log.logSegments().size());
+        assertEquals(0, log.logStartOffset());
+        assertEquals(2, log.localLogStartOffset());
+    }
+
+    @Test
+    public void 
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionMs()
 throws IOException {
+        MemoryRecords createRecords = records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+        int segmentBytes = createRecords.sizeInBytes();
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(segmentBytes)
+                .retentionMs(1000)
+                .fileDeleteDelayMs(0)
+                .remoteLogStorageEnable(true)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig, true);
+
+        // Given 6 segments of 1 message each
+        for (int i = 0; i < 6; i++) {
+            log.appendAsLeader(records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+        }
+        assertEquals(6, log.logSegments().size());
+
+        log.updateHighWatermark(log.logEndOffset());
+        // simulate calls to upload 2 segments to remote storage
+        log.updateHighestOffsetInRemoteStorage(1);
+
+        mockTime.sleep(1001);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(4, log.logSegments().size());
+        assertEquals(0, log.logStartOffset());
+        assertEquals(2, log.localLogStartOffset());
+    }
+
+    @Test
+    public void testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled() 
throws IOException {
+        MemoryRecords createRecords = records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes())));
+        int segmentBytes = createRecords.sizeInBytes();
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(segmentBytes)
+                .localRetentionBytes(1)
+                .retentionBytes((long) segmentBytes * 5)
+                .fileDeleteDelayMs(0)
+                .remoteLogStorageEnable(true)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig, true);
+
+        // Given 6 segments of 1 message each
+        for (int i = 0; i < 6; i++) {
+            log.appendAsLeader(records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+        }
+        assertEquals(6, log.logSegments().size());
+
+        log.updateHighWatermark(log.logEndOffset());
+
+        // Should not delete local log because highest remote storage offset 
is -1 (default value)
+        assertEquals(0, log.deleteOldSegments());
+        assertEquals(6, log.logSegments().size());
+        assertEquals(0, log.logStartOffset());
+        assertEquals(0, log.localLogStartOffset());
+
+        // simulate calls to upload 2 segments to remote storage
+        log.updateHighestOffsetInRemoteStorage(1);
+
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(4, log.logSegments().size());
+        assertEquals(0, log.logStartOffset());
+        assertEquals(2, log.localLogStartOffset());
+
+        // add remoteCopyDisabled = true
+        LogConfig copyDisabledLogConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(segmentBytes)
+                .localRetentionBytes(1)
+                .retentionBytes((long) segmentBytes * 5)
+                .fileDeleteDelayMs(0)
+                .remoteLogStorageEnable(true)
+                .remoteLogCopyDisable(true)
+                .build();
+        log.updateConfig(copyDisabledLogConfig);
+
+        // No local logs will be deleted even though local retention bytes is 
1 because we'll adopt retention.ms/bytes
+        // when remote.log.copy.disable = true
+        assertEquals(0, log.deleteOldSegments());
+        assertEquals(4, log.logSegments().size());
+        assertEquals(0, log.logStartOffset());
+        assertEquals(2, log.localLogStartOffset());
+
+        // simulate the remote logs are all deleted due to retention policy
+        log.updateLogStartOffsetFromRemoteTier(2);
+        assertEquals(4, log.logSegments().size());
+        assertEquals(2, log.logStartOffset());
+        assertEquals(2, log.localLogStartOffset());
+
+        // produce 3 more segments
+        for (int i = 0; i < 3; i++) {
+            log.appendAsLeader(records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes()))), 0);
+        }
+        assertEquals(7, log.logSegments().size());
+        log.updateHighWatermark(log.logEndOffset());
+
+        // try to delete local logs again, 2 segments will be deleted this 
time because we'll adopt retention.ms/bytes (retention.bytes = 5)
+        // when remote.log.copy.disable = true
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(5, log.logSegments().size());
+        assertEquals(4, log.logStartOffset());
+        assertEquals(4, log.localLogStartOffset());
+
+        // add localRetentionMs = 1, retentionMs = 1000
+        LogConfig retentionMsConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(segmentBytes)
+                .localRetentionMs(1)
+                .retentionMs(1000)
+                .fileDeleteDelayMs(0)
+                .remoteLogStorageEnable(true)
+                .remoteLogCopyDisable(true)
+                .build();
+        log.updateConfig(retentionMsConfig);
+
+        // Should not delete any logs because no local logs expired using 
retention.ms = 1000
+        mockTime.sleep(10);
+        assertEquals(0, log.deleteOldSegments());
+        assertEquals(5, log.logSegments().size());
+        assertEquals(4, log.logStartOffset());
+        assertEquals(4, log.localLogStartOffset());
+
+        // Should delete all logs because all of them are expired based on 
retentionMs = 1000
+        mockTime.sleep(1000);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(1, log.logSegments().size());
+        assertEquals(9, log.logStartOffset());
+        assertEquals(9, log.localLogStartOffset());
+    }
+
+    @Test
+    public void testIncrementLocalLogStartOffsetAfterLocalLogDeletion() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .localRetentionBytes(1)
+                .fileDeleteDelayMs(0)
+                .remoteLogStorageEnable(true)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig, true);
+
+        long offset;
+        for (int i = 0; i < 50; i++) {
+            MemoryRecords records = singletonRecords("test".getBytes());
+            LogAppendInfo info = log.appendAsLeader(records, 0);
+            offset = info.lastOffset();
+            if (offset != 0 && offset % 10 == 0)
+                log.roll();
+        }
+        assertEquals(5, log.logSegments().size());
+        log.updateHighWatermark(log.logEndOffset());
+        // simulate calls to upload 3 segments to remote storage
+        log.updateHighestOffsetInRemoteStorage(30);
+
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(2, log.logSegments().size());
+        assertEquals(0, log.logStartOffset());
+        assertEquals(31, log.localLogStartOffset());
+    }
+
+    @Test
+    public void testConvertToOffsetMetadataDoesNotThrowOffsetOutOfRangeError() 
throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .localRetentionBytes(1)
+                .fileDeleteDelayMs(0)
+                .remoteLogStorageEnable(true)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig, true);
+
+        long offset;
+        for (int i = 0; i < 50; i++) {
+            MemoryRecords records = singletonRecords("test".getBytes());
+            LogAppendInfo info = log.appendAsLeader(records, 0);
+            offset = info.lastOffset();
+            if (offset != 0 && offset % 10 == 0)
+                log.roll();
+        }
+        assertEquals(5, log.logSegments().size());
+        log.updateHighWatermark(log.logEndOffset());
+        // simulate calls to upload 3 segments to remote storage
+        log.updateHighestOffsetInRemoteStorage(30);
+
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        assertEquals(2, log.logSegments().size());
+        assertEquals(0, log.logStartOffset());
+        assertEquals(31, log.localLogStartOffset());
+
+        log.updateLogStartOffsetFromRemoteTier(15);
+        assertEquals(15, log.logStartOffset());
+
+        // case-1: offset is higher than the local-log-start-offset.
+        // log-start-offset < local-log-start-offset < offset-to-be-converted 
< log-end-offset
+        assertEquals(new LogOffsetMetadata(35, 31, 288), 
log.maybeConvertToOffsetMetadata(35));
+        // case-2: offset is less than the local-log-start-offset
+        // log-start-offset < offset-to-be-converted < local-log-start-offset 
< log-end-offset
+        assertEquals(new LogOffsetMetadata(29, -1L, -1), 
log.maybeConvertToOffsetMetadata(29));
+        // case-3: offset is higher than the log-end-offset
+        // log-start-offset < local-log-start-offset < log-end-offset < 
offset-to-be-converted
+        assertEquals(new LogOffsetMetadata(log.logEndOffset() + 1, -1L, -1), 
log.maybeConvertToOffsetMetadata(log.logEndOffset() + 1));
+        // case-4: offset is less than the log-start-offset
+        // offset-to-be-converted < log-start-offset < local-log-start-offset 
< log-end-offset
+        assertEquals(new LogOffsetMetadata(14, -1L, -1), 
log.maybeConvertToOffsetMetadata(14));
+    }
+
+    @Test
+    public void testGetFirstBatchTimestampForSegments() throws IOException {
+        UnifiedLog log = createLog(logDir, new 
LogTestUtils.LogConfigBuilder().build());
+
+        List<LogSegment> segments = new ArrayList<>();
+        LogSegment seg1 = LogTestUtils.createSegment(1, logDir, 10, mockTime);
+        LogSegment seg2 = LogTestUtils.createSegment(2, logDir, 10, mockTime);
+        segments.add(seg1);
+        segments.add(seg2);
+
+        List<Long> timestamps = new 
ArrayList<>(log.getFirstBatchTimestampForSegments(segments));
+        assertEquals(List.of(Long.MAX_VALUE, Long.MAX_VALUE), timestamps);
+
+        seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new 
SimpleRecord(1000L, "one".getBytes())));
+        seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new 
SimpleRecord(2000L, "two".getBytes())));
+
+        timestamps = new 
ArrayList<>(log.getFirstBatchTimestampForSegments(segments));
+        assertEquals(List.of(1000L, 2000L), timestamps);
+
+        seg1.close();
+        seg2.close();
+    }
+
+    @Test
+    public void 
testFetchOffsetByTimestampShouldReadOnlyLocalLogWhenLogIsEmpty() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().remoteLogStorageEnable(true).build();
+        UnifiedLog log = createLog(logDir, logConfig, true);
+        OffsetResultHolder result = 
log.fetchOffsetByTimestamp(mockTime.milliseconds(), Optional.empty());
+        assertEquals(new OffsetResultHolder(Optional.empty(), 
Optional.empty()), result);
+    }
+
+    @Test
+    public void testStaleProducerEpochReturnsRecoverableErrorForTV1Clients() 
throws IOException {
+        // Producer epoch gets incremented (coordinator fail over, completed 
transaction, etc.)
+        // and client has stale cached epoch. Fix prevents fatal 
InvalidTxnStateException.
+
+        ProducerStateManagerConfig psmConfig = new 
ProducerStateManagerConfig(86400000, true);
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+        long producerId = 123L;
+        short oldEpoch = 5;
+        short newEpoch = 6;
+
+        // Step 1: Simulate a scenario where producer epoch was incremented to 
fence the producer
+        MemoryRecords previousRecords = MemoryRecords.withTransactionalRecords(
+                Compression.NONE, producerId, newEpoch, 0,
+                new SimpleRecord("previous-key".getBytes(), 
"previous-value".getBytes()));
+        VerificationGuard previousGuard = 
log.maybeStartTransactionVerification(producerId, 0, newEpoch, false); // TV1 = 
supportsEpochBump = false
+        log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), previousGuard,
+                TransactionVersion.TV_1.featureLevel());
+
+        // Complete the transaction normally (commits do update producer state 
with current epoch)
+        MemoryRecords commitMarker = MemoryRecords.withEndTransactionMarker(
+                producerId, newEpoch, new 
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+        log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
+                TransactionVersion.TV_1.featureLevel());
+
+        // Step 2: TV1 client tries to write with stale cached epoch (before 
learning about epoch increment)
+        MemoryRecords staleEpochRecords = 
MemoryRecords.withTransactionalRecords(
+                Compression.NONE, producerId, oldEpoch, 0,
+                new SimpleRecord("stale-epoch-key".getBytes(), 
"stale-epoch-value".getBytes()));
+
+        // Step 3: Verify our fix - should get InvalidProducerEpochException 
(recoverable), not InvalidTxnStateException (fatal)
+        InvalidProducerEpochException exception = 
assertThrows(InvalidProducerEpochException.class, () -> {
+            VerificationGuard staleGuard = 
log.maybeStartTransactionVerification(producerId, 0, oldEpoch, false);
+            log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), staleGuard,
+                    TransactionVersion.TV_1.featureLevel());
+        });
+
+        // Verify the error message indicates epoch mismatch
+        assertTrue(exception.getMessage().contains("smaller than the last seen 
epoch"));
+        assertTrue(exception.getMessage().contains(String.valueOf(oldEpoch)));
+        assertTrue(exception.getMessage().contains(String.valueOf(newEpoch)));
+    }
+
+    @Test
+    public void testStaleProducerEpochReturnsRecoverableErrorForTV2Clients() 
throws IOException {
+        // Check producer epoch FIRST - if stale, return recoverable error 
before verification checks.
+
+        ProducerStateManagerConfig psmConfig = new 
ProducerStateManagerConfig(86400000, true);
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+        long producerId = 456L;
+        short originalEpoch = 3;
+        short bumpedEpoch = 4;
+
+        // Step 1: Start transaction with epoch 3 (before timeout)
+        MemoryRecords initialRecords = MemoryRecords.withTransactionalRecords(
+                Compression.NONE, producerId, originalEpoch, 0,
+                new SimpleRecord("ks-initial-key".getBytes(), 
"ks-initial-value".getBytes()));
+        VerificationGuard initialGuard = 
log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true); // 
TV2 = supportsEpochBump = true
+        log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), initialGuard,
+                TransactionVersion.TV_2.featureLevel());
+
+        // Step 2: Coordinator times out and aborts transaction
+        // TV2 (KIP-890): Coordinator bumps epoch from 3 -> 4 and sends abort 
marker with epoch 4
+        MemoryRecords abortMarker = MemoryRecords.withEndTransactionMarker(
+                producerId, bumpedEpoch, new 
EndTransactionMarker(ControlRecordType.ABORT, 0));
+        log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR, 
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
+                TransactionVersion.TV_2.featureLevel());
+
+        // Step 3: TV2 transactional producer tries to append with stale epoch 
(timeout recovery scenario)
+        MemoryRecords staleEpochRecords = 
MemoryRecords.withTransactionalRecords(
+                Compression.NONE, producerId, originalEpoch, 0,
+                new SimpleRecord("ks-resume-key".getBytes(), 
"ks-resume-value".getBytes()));
+
+        // Step 4: Verify our fix works for TV2 - should get 
InvalidProducerEpochException (recoverable), not InvalidTxnStateException 
(fatal)
+        InvalidProducerEpochException exception = 
assertThrows(InvalidProducerEpochException.class, () -> {
+            VerificationGuard staleGuard = 
log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true); // 
TV2 = supportsEpochBump = true
+            log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(), staleGuard,
+                    TransactionVersion.TV_2.featureLevel());
+        });
+
+        // Verify the error message indicates epoch mismatch (3 < 4)
+        assertTrue(exception.getMessage().contains("smaller than the last seen 
epoch"));
+        
assertTrue(exception.getMessage().contains(String.valueOf(originalEpoch)));
+        
assertTrue(exception.getMessage().contains(String.valueOf(bumpedEpoch)));
+    }
 }

Reply via email to