This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ceb36f98f1 KAFKA-19752 Move UnifiedLogTest to storage module (#21761)
6ceb36f98f1 is described below

commit 6ceb36f98f1af6868251c0d89f9aca1250bd0ccf
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Mar 20 01:12:39 2026 +0800

    KAFKA-19752 Move UnifiedLogTest to storage module (#21761)
    
    testFetchEarliestPendingUploadTimestampNoRemoteStorage ~
    testTransactionIndexUpdated  part.
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 792 +-----------------
 .../kafka/storage/internals/log/LogTestUtils.java  |  81 ++
 .../storage/internals/log/UnifiedLogTest.java      | 907 ++++++++++++++++++++-
 3 files changed, 991 insertions(+), 789 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index b01375e59ae..65a41b2579c 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -25,39 +25,32 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.record.internal.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.internal._
 import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.requests.{ListOffsetsRequest, 
ListOffsetsResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
-import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManager, RemoteLogManagerConfig}
+import org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, 
DelayedRemoteListOffsets}
-import org.apache.kafka.server.storage.log.{FetchIsolation, 
UnexpectedAppendOffsetException}
+import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.{MockTime, Scheduler}
-import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
-import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
AsyncOffsetReader, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException, 
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, 
LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, 
OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
-import org.mockito.ArgumentMatchers
-import org.mockito.ArgumentMatchers.{any, anyLong}
-import org.mockito.Mockito.{doAnswer, doThrow, spy}
-import org.apache.kafka.raft.KRaftConfigs
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{doThrow, spy}
 
 import java.io._
 import java.nio.ByteBuffer
 import java.nio.file.Files
 import java.util
-import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
-import java.util.{Optional, Properties}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.Optional
 import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 
@@ -91,728 +84,6 @@ class UnifiedLogTest {
     }
   }
 
-  private def createKafkaConfigWithRLM: KafkaConfig = {
-    val props = new Properties()
-    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
-    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0")
-    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
-    props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
-    props.setProperty("listeners", "CONTROLLER://:9093")
-    props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
-    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true")
-    props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteStorageManager].getName)
-    
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteLogMetadataManager].getName)
-    // set log reader threads number to 2
-    props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2")
-    KafkaConfig.fromProps(props)
-  }
-
-  @Test
-  def testFetchEarliestPendingUploadTimestampNoRemoteStorage(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
-    val log = createLog(logDir, logConfig)
-
-    // Test initial state before any records
-    assertFetchOffsetBySpecialTimestamp(log, None, new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)),
-      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
-
-    // Append records
-    val _ = prepareLogWithSequentialRecords(log, recordCount = 2)
-
-    // Test state after records are appended
-    assertFetchOffsetBySpecialTimestamp(log, None, new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)),
-      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
-  }
-
-  @Test
-  def testFetchEarliestPendingUploadTimestampWithRemoteStorage(): Unit = {
-    val logStartOffset = 0
-    val (remoteLogManager: RemoteLogManager, log: UnifiedLog, 
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
-
-    val (firstTimestamp, firstLeaderEpoch) = 
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
-    val (secondTimestamp, secondLeaderEpoch) = 
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
-    val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, 
timestampAndEpochs(2).leaderEpoch)
-
-    doAnswer(ans => {
-      val timestamp = ans.getArgument(1).asInstanceOf[Long]
-      Optional.of(timestamp)
-        .filter(_ == timestampAndEpochs.head.timestamp)
-        .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, 
Optional.of(timestampAndEpochs.head.leaderEpoch)))
-    
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
-      anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
-
-    // Offset 0 (first timestamp) is in remote storage and deleted locally. 
Offset 1 (second timestamp) is in local storage.
-    log.updateLocalLogStartOffset(1)
-    log.updateHighestOffsetInRemoteStorage(0)
-
-    // In the assertions below we test that offset 0 (first timestamp) is only 
in remote and offset 1 (second timestamp) is in local storage.
-    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), 
firstTimestamp)
-    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), 
secondTimestamp)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
-      ListOffsetsRequest.LATEST_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
-  }
-
-  @Test
-  def 
testFetchEarliestPendingUploadTimestampWithRemoteStorageNoLocalDeletion(): Unit 
= {
-    val logStartOffset = 0
-    val (remoteLogManager: RemoteLogManager, log: UnifiedLog, 
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
-
-    val (firstTimestamp, firstLeaderEpoch) = 
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
-    val (secondTimestamp, secondLeaderEpoch) = 
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
-    val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, 
timestampAndEpochs(2).leaderEpoch)
-
-    // Offsets upto 1 are in remote storage
-    doAnswer(ans => {
-      val timestamp = ans.getArgument(1).asInstanceOf[Long]
-      Optional.of(
-        timestamp match {
-          case x if x == firstTimestamp => new TimestampAndOffset(x, 0L, 
Optional.of(firstLeaderEpoch))
-          case x if x == secondTimestamp => new TimestampAndOffset(x, 1L, 
Optional.of(secondLeaderEpoch))
-          case _ => null
-        }
-      )
-    
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
-      anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
-
-    // Offsets 0, 1 (first and second timestamps) are in remote storage and 
not deleted locally.
-    log.updateLocalLogStartOffset(0)
-    log.updateHighestOffsetInRemoteStorage(1)
-
-    // In the assertions below we test that offset 0 (first timestamp) and 
offset 1 (second timestamp) are on both remote and local storage
-    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), 
firstTimestamp)
-    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), 
secondTimestamp)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
-      ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
-      ListOffsetsRequest.LATEST_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(thirdLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
-  }
-
-  @Test
-  def testFetchEarliestPendingUploadTimestampNoSegmentsUploaded(): Unit = {
-    val logStartOffset = 0
-    val (remoteLogManager: RemoteLogManager, log: UnifiedLog, 
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
-
-    val (firstTimestamp, firstLeaderEpoch) = 
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
-    val (secondTimestamp, secondLeaderEpoch) = 
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
-    val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, 
timestampAndEpochs(2).leaderEpoch)
-
-    // No offsets are in remote storage
-    doAnswer(_ => Optional.empty[TimestampAndOffset]())
-      
.when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
-        anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
-
-    // Offsets 0, 1, 2 (first, second and third timestamps) are in local 
storage only and not uploaded to remote storage.
-    log.updateLocalLogStartOffset(0)
-    log.updateHighestOffsetInRemoteStorage(-1)
-
-    // In the assertions below we test that offset 0 (first timestamp), offset 
1 (second timestamp) and offset 2 (third timestamp) are only on the local 
storage.
-    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), 
firstTimestamp)
-    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), 
secondTimestamp)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1L, Optional.of(-1)),
-      ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
-      ListOffsetsRequest.LATEST_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
-  }
-
-  @Test
-  def testFetchEarliestPendingUploadTimestampStaleHighestOffsetInRemote(): 
Unit = {
-    val logStartOffset = 100
-    val (remoteLogManager: RemoteLogManager, log: UnifiedLog, 
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
-
-    val (firstTimestamp, firstLeaderEpoch) = 
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
-    val (secondTimestamp, secondLeaderEpoch) = 
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
-    val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, 
timestampAndEpochs(2).leaderEpoch)
-
-    // Offsets 100, 101, 102 (first, second and third timestamps) are in local 
storage and not uploaded to remote storage.
-    // Tiered storage copy was disabled and then enabled again, because of 
which the remote log segments are deleted but
-    // the highest offset in remote storage has become stale
-    doAnswer(_ => Optional.empty[TimestampAndOffset]())
-      
.when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
-        anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
-
-    log.updateLocalLogStartOffset(100)
-    log.updateHighestOffsetInRemoteStorage(50)
-
-    // In the assertions below we test that offset 100 (first timestamp), 
offset 101 (second timestamp) and offset 102 (third timestamp) are only on the 
local storage.
-    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(firstTimestamp, 100L, Optional.of(firstLeaderEpoch))), 
firstTimestamp)
-    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(secondTimestamp, 101L, Optional.of(secondLeaderEpoch))), 
secondTimestamp)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 50L, 
Optional.empty()),
-      ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 103L, 
Optional.of(thirdLeaderEpoch)),
-      ListOffsetsRequest.LATEST_TIMESTAMP)
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
-      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
-  }
-
-  private def prepare(logStartOffset: Int): (RemoteLogManager, UnifiedLog, 
Seq[TimestampAndEpoch]) = {
-    val config: KafkaConfig = createKafkaConfigWithRLM
-    val purgatory = new 
DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", 
config.brokerId)
-    val remoteLogManager = spy(new 
RemoteLogManager(config.remoteLogManagerConfig,
-      0,
-      logDir.getAbsolutePath,
-      "clusterId",
-      mockTime,
-      _ => Optional.empty[UnifiedLog](),
-      (_, _) => {},
-      brokerTopicStats,
-      new Metrics(),
-      Optional.empty))
-    remoteLogManager.setDelayedOperationPurgatory(purgatory)
-
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1, remoteLogStorageEnable = true)
-    val log = createLog(logDir, logConfig, logStartOffset = logStartOffset, 
remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
-
-    // Verify earliest pending upload offset for empty log
-    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, logStartOffset, 
Optional.empty()),
-      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
-
-    val timestampAndEpochs = prepareLogWithSequentialRecords(log, recordCount 
= 3)
-    (remoteLogManager, log, timestampAndEpochs)
-  }
-
-  /**
-   * Test the Log truncate operations
-   */
-  @Test
-  def testTruncateTo(): Unit = {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds)
-    val setSize = createRecords.sizeInBytes
-    val msgPerSeg = 10
-    val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
-
-    // create a log
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize)
-    val log = createLog(logDir, logConfig)
-    assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
-    for (_ <- 1 to msgPerSeg)
-      log.appendAsLeader(createRecords, 0)
-
-    assertEquals(1, log.numberOfSegments, "There should be exactly 1 
segments.")
-    assertEquals(msgPerSeg, log.logEndOffset, "Log end offset should be equal 
to number of messages")
-
-    val lastOffset = log.logEndOffset
-    val size = log.size
-    log.truncateTo(log.logEndOffset) // keep the entire log
-    assertEquals(lastOffset, log.logEndOffset, "Should not change offset")
-    assertEquals(size, log.size, "Should not change log size")
-    log.truncateTo(log.logEndOffset + 1) // try to truncate beyond lastOffset
-    assertEquals(lastOffset, log.logEndOffset, "Should not change offset but 
should log error")
-    assertEquals(size, log.size, "Should not change log size")
-    log.truncateTo(msgPerSeg/2) // truncate somewhere in between
-    assertEquals(log.logEndOffset, msgPerSeg/2, "Should change offset")
-    assertTrue(log.size < size, "Should change log size")
-    log.truncateTo(0) // truncate the entire log
-    assertEquals(0, log.logEndOffset, "Should change offset")
-    assertEquals(0, log.size, "Should change log size")
-
-    for (_ <- 1 to msgPerSeg)
-      log.appendAsLeader(createRecords, 0)
-
-    assertEquals(log.logEndOffset, lastOffset, "Should be back to original 
offset")
-    assertEquals(log.size, size, "Should be back to original size")
-    log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1), 
Optional.empty)
-    assertEquals(log.logEndOffset, lastOffset - (msgPerSeg - 1), "Should 
change offset")
-    assertEquals(log.size, 0, "Should change log size")
-
-    for (_ <- 1 to msgPerSeg)
-      log.appendAsLeader(createRecords, 0)
-
-    assertTrue(log.logEndOffset > msgPerSeg, "Should be ahead of to original 
offset")
-    assertEquals(size, log.size, "log size should be same as before")
-    log.truncateTo(0) // truncate before first start offset in the log
-    assertEquals(0, log.logEndOffset, "Should change offset")
-    assertEquals(log.size, 0, "Should change log size")
-  }
-
-  /**
-   * Verify that when we truncate a log the index of the last segment is 
resized to the max index size to allow more appends
-   */
-  @Test
-  def testIndexResizingAtTruncation(): Unit = {
-    val setSize = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds).sizeInBytes
-    val msgPerSeg = 10
-    val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentSize, 
indexIntervalBytes = setSize - 1)
-    val log = createLog(logDir, logConfig)
-    assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
-    for (i<- 1 to msgPerSeg)
-      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds + i), 0)
-    assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-
-    mockTime.sleep(msgPerSeg)
-    for (i<- 1 to msgPerSeg)
-      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds + i), 0)
-    assertEquals(2, log.numberOfSegments, "There should be exactly 2 segment.")
-    val expectedEntries = msgPerSeg - 1
-
-    assertEquals(expectedEntries, 
log.logSegments.asScala.toList.head.offsetIndex.maxEntries,
-      s"The index of the first segment should have $expectedEntries entries")
-    assertEquals(expectedEntries, 
log.logSegments.asScala.toList.head.timeIndex.maxEntries,
-      s"The time index of the first segment should have $expectedEntries 
entries")
-
-    log.truncateTo(0)
-    assertEquals(1, log.numberOfSegments, "There should be exactly 1 segment.")
-    assertEquals(log.config.maxIndexSize/8, 
log.logSegments.asScala.toList.head.offsetIndex.maxEntries,
-      "The index of segment 1 should be resized to maxIndexSize")
-    assertEquals(log.config.maxIndexSize/12, 
log.logSegments.asScala.toList.head.timeIndex.maxEntries,
-      "The time index of segment 1 should be resized to maxIndexSize")
-
-    mockTime.sleep(msgPerSeg)
-    for (i<- 1 to msgPerSeg)
-      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds + i), 0)
-    assertEquals(1, log.numberOfSegments,
-      "There should be exactly 1 segment.")
-  }
-
-  /**
-   * Test that deleted files are deleted after the appropriate time.
-   */
-  @Test
-  def testAsyncDelete(): Unit = {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = mockTime.milliseconds - 1000L)
-    val asyncDeleteMs = 1000
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 
10000,
-                                    retentionMs = 999, fileDeleteDelayMs = 
asyncDeleteMs)
-    val log = createLog(logDir, logConfig)
-
-    // append some messages to create some segments
-    for (_ <- 0 until 100)
-      log.appendAsLeader(createRecords, 0)
-
-    // files should be renamed
-    val segments = log.logSegments.asScala.toArray
-    val oldFiles = segments.map(_.log.file) ++ segments.map(_.offsetIndexFile)
-
-    log.updateHighWatermark(log.logEndOffset)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-
-    assertEquals(1, log.numberOfSegments, "Only one segment should remain.")
-    
assertTrue(segments.forall(_.log.file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 &&
-      
segments.forall(_.offsetIndexFile.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)),
-      "All log and index files should end in .deleted")
-    assertTrue(segments.forall(_.log.file.exists) && 
segments.forall(_.offsetIndexFile.exists),
-      "The .deleted files should still be there.")
-    assertTrue(oldFiles.forall(!_.exists), "The original file should be gone.")
-
-    // when enough time passes the files should be deleted
-    val deletedFiles = segments.map(_.log.file) ++ 
segments.map(_.offsetIndexFile)
-    mockTime.sleep(asyncDeleteMs + 1)
-    assertTrue(deletedFiles.forall(!_.exists), "Files should all be gone.")
-  }
-
-  @Test
-  def testAppendMessageWithNullPayload(): Unit = {
-    val log = createLog(logDir, new LogConfig(new Properties))
-    log.appendAsLeader(TestUtils.singletonRecords(value = null), 0)
-    val head = LogTestUtils.readLog(log, 0, 
4096).records.records.iterator.next()
-    assertEquals(0, head.offset)
-    assertFalse(head.hasValue, "Message payload should be null.")
-  }
-
-  @Test
-  def testAppendWithOutOfOrderOffsetsThrowsException(): Unit = {
-    val log = createLog(logDir, new LogConfig(new Properties))
-
-    val epoch = 0
-    val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L)
-    val buffer = ByteBuffer.allocate(512)
-    for (offset <- appendOffsets) {
-      val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
Compression.NONE,
-                                          TimestampType.LOG_APPEND_TIME, 
offset, mockTime.milliseconds(),
-                                          1L, 0, 0, false, epoch)
-      builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
-      builder.close()
-    }
-    buffer.flip()
-    val memoryRecords = MemoryRecords.readableRecords(buffer)
-
-    assertThrows(
-      classOf[OffsetsOutOfOrderException],
-      () => log.appendAsFollower(memoryRecords, epoch)
-    )
-  }
-
-  @Test
-  def testAppendBelowExpectedOffsetThrowsException(): Unit = {
-    val log = createLog(logDir, new LogConfig(new Properties))
-    val records = (0 until 2).map(id => new 
SimpleRecord(id.toString.getBytes)).toArray
-    records.foreach(record => 
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, record), 0))
-
-    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
-    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
-    for (magic <- magicVals; compressionType <- compressionTypes) {
-      val compression = Compression.of(compressionType).build()
-      val invalidRecord = MemoryRecords.withRecords(magic, compression, new 
SimpleRecord(1.toString.getBytes))
-      assertThrows(
-        classOf[UnexpectedAppendOffsetException],
-        () => log.appendAsFollower(invalidRecord, Int.MaxValue),
-        () => s"Magic=$magic, compressionType=$compressionType"
-      )
-    }
-  }
-
-  @Test
-  def testAppendEmptyLogBelowLogStartOffsetThrowsException(): Unit = {
-    createEmptyLogs(logDir, 7)
-    val log = createLog(logDir, new LogConfig(new Properties), 
brokerTopicStats = brokerTopicStats)
-    assertEquals(7L, log.logStartOffset)
-    assertEquals(7L, log.logEndOffset)
-
-    val firstOffset = 4L
-    val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
-    val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4)
-    for (magic <- magicVals; compressionType <- compressionTypes) {
-      val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes),
-                                         new SimpleRecord("k2".getBytes, 
"v2".getBytes),
-                                         new SimpleRecord("k3".getBytes, 
"v3".getBytes)),
-                                    magicValue = magic, codec = 
Compression.of(compressionType).build(),
-                                    baseOffset = firstOffset)
-
-      val exception = assertThrows(
-        classOf[UnexpectedAppendOffsetException],
-        () => log.appendAsFollower(batch, Int.MaxValue)
-      )
-      assertEquals(firstOffset, exception.firstOffset, s"Magic=$magic, 
compressionType=$compressionType, UnexpectedAppendOffsetException#firstOffset")
-      assertEquals(firstOffset + 2, exception.lastOffset, s"Magic=$magic, 
compressionType=$compressionType, UnexpectedAppendOffsetException#lastOffset")
-    }
-  }
-
-  @Test
-  def testAppendWithNoTimestamp(): Unit = {
-    val log = createLog(logDir, new LogConfig(new Properties))
-    log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
-      new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, 
"value".getBytes)), 0)
-  }
-
-  @Test
-  def testAppendToOrReadFromLogInFailedLogDir(): Unit = {
-    val pid = 1L
-    val epoch = 0.toShort
-    val log = createLog(logDir, new LogConfig(new Properties))
-    log.appendAsLeader(TestUtils.singletonRecords(value = null), 0)
-    assertEquals(0, LogTestUtils.readLog(log, 0, 
4096).records.records.iterator.next().offset)
-    val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, 
mockTime)
-    append(10)
-    // Kind of a hack, but renaming the index to a directory ensures that the 
append
-    // to the index will fail.
-    log.activeSegment.txnIndex.renameTo(log.dir)
-    assertThrows(classOf[KafkaStorageException], () => 
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT,
-      mockTime.milliseconds(), coordinatorEpoch = 1, transactionVersion = 
TransactionVersion.TV_0.featureLevel()))
-    assertThrows(classOf[KafkaStorageException], () => 
log.appendAsLeader(TestUtils.singletonRecords(value = null), 0))
-    assertThrows(classOf[KafkaStorageException], () => 
LogTestUtils.readLog(log, 0, 4096).records.records.iterator.next().offset)
-  }
-
-  @Test
-  def testWriteLeaderEpochCheckpointAfterDirectoryRename(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
-    val log = createLog(logDir, logConfig)
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), 5)
-    assertEquals(Optional.of(5), log.latestEpoch)
-
-    // Ensure that after a directory rename, the epoch cache is written to the 
right location
-    val tp = UnifiedLog.parseTopicPartitionName(log.dir)
-    log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), 10)
-    assertEquals(Optional.of(10), log.latestEpoch)
-    assertTrue(LeaderEpochCheckpointFile.newFile(log.dir).exists())
-    assertFalse(LeaderEpochCheckpointFile.newFile(this.logDir).exists())
-  }
-
-  @Test
-  def testTopicIdTransfersAfterDirectoryRename(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
-    val topicId = Uuid.randomUuid()
-    log.assignTopicId(topicId)
-
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), 5)
-    assertEquals(Optional.of(5), log.latestEpoch)
-
-    // Ensure that after a directory rename, the partition metadata file is 
written to the right location.
-    val tp = UnifiedLog.parseTopicPartitionName(log.dir)
-    log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), 10)
-    assertEquals(Optional.of(10), log.latestEpoch)
-    assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
-    assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
-
-    // Check the topic ID remains in memory and was copied correctly.
-    assertTrue(log.topicId.isPresent)
-    assertEquals(topicId, log.topicId.get)
-    assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
-  }
-
-  @Test
-  def testTopicIdFlushesBeforeDirectoryRename(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
-    val topicId = Uuid.randomUuid()
-    log.partitionMetadataFile.get.record(topicId)
-
-    // Ensure that after a directory rename, the partition metadata file is 
written to the right location.
-    val tp = UnifiedLog.parseTopicPartitionName(log.dir)
-    log.renameDir(UnifiedLog.logDeleteDirName(tp), true)
-    assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
-    assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
-
-    // Check the file holds the correct contents.
-    assertTrue(log.partitionMetadataFile.get.exists())
-    assertEquals(topicId, log.partitionMetadataFile.get.read().topicId)
-  }
-
-  @Test
-  def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, 
indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
-    val log = createLog(logDir, logConfig)
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes()))), 5)
-    assertEquals(Optional.of(5), log.leaderEpochCache.latestEpoch)
-
-    log.appendAsFollower(
-      TestUtils.records(
-        List(
-          new SimpleRecord("foo".getBytes())
-        ),
-        baseOffset = 1L,
-        magicValue = RecordVersion.V1.value
-      ),
-      5
-    )
-    assertEquals(Optional.empty, log.leaderEpochCache.latestEpoch)
-  }
-
-  @Test
-  def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = {
-    val logProps = new Properties()
-    logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000")
-    logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1")
-    logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536")
-    val logConfig = new LogConfig(logProps)
-    val log = createLog(logDir, logConfig)
-    log.appendAsLeaderWithRecordVersion(TestUtils.records(List(new 
SimpleRecord("bar".getBytes())),
-      magicValue = RecordVersion.V1.value), 5, RecordVersion.V1)
-    assertTrue(log.latestEpoch.isEmpty)
-
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord("foo".getBytes())),
-      magicValue = RecordVersion.V2.value), 5)
-    assertEquals(5, log.latestEpoch.get)
-  }
-
-  @Test
-  def testSplitOnOffsetOverflow(): Unit = {
-    // create a log such that one log segment has offsets that overflow, and 
call the split API on that segment
-    val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, 
fileDeleteDelayMs = 1000)
-    val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig)
-    assertTrue(LogTestUtils.hasOffsetOverflow(log), "At least one segment must 
have offset overflow")
-
-    val allRecordsBeforeSplit = UnifiedLogTest.allRecords(log)
-
-    // split the segment with overflow
-    log.splitOverflowedSegment(segmentWithOverflow)
-
-    // assert we were successfully able to split the segment
-    assertEquals(4, log.numberOfSegments)
-    UnifiedLogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
-
-    // verify we do not have offset overflow anymore
-    assertFalse(LogTestUtils.hasOffsetOverflow(log))
-  }
-
-  @Test
-  def testDegenerateSegmentSplit(): Unit = {
-    // This tests a scenario where all of the batches appended to a segment 
have overflowed.
-    // When we split the overflowed segment, only one new segment will be 
created.
-
-    val overflowOffset = Int.MaxValue + 1L
-    val batch1 = MemoryRecords.withRecords(overflowOffset, Compression.NONE, 0,
-      new SimpleRecord("a".getBytes))
-    val batch2 = MemoryRecords.withRecords(overflowOffset + 1, 
Compression.NONE, 0,
-      new SimpleRecord("b".getBytes))
-
-    testDegenerateSplitSegmentWithOverflow(segmentBaseOffset = 0L, 
List(batch1, batch2))
-  }
-
-  @Test
-  def testDegenerateSegmentSplitWithOutOfRangeBatchLastOffset(): Unit = {
-    // Degenerate case where the only batch in the segment overflows. In this 
scenario,
-    // the first offset of the batch is valid, but the last overflows.
-
-    val firstBatchBaseOffset = Int.MaxValue - 1
-    val records = MemoryRecords.withRecords(firstBatchBaseOffset, 
Compression.NONE, 0,
-      new SimpleRecord("a".getBytes),
-      new SimpleRecord("b".getBytes),
-      new SimpleRecord("c".getBytes))
-
-    testDegenerateSplitSegmentWithOverflow(segmentBaseOffset = 0L, 
List(records))
-  }
-
-  private def testDegenerateSplitSegmentWithOverflow(segmentBaseOffset: Long, 
records: List[MemoryRecords]): Unit = {
-    val segment = LogTestUtils.rawSegment(logDir, segmentBaseOffset)
-    // Need to create the offset files explicitly to avoid triggering segment 
recovery to truncate segment.
-    Files.createFile(LogFileUtils.offsetIndexFile(logDir, 
segmentBaseOffset).toPath)
-    Files.createFile(LogFileUtils.timeIndexFile(logDir, 
segmentBaseOffset).toPath)
-    records.foreach(segment.append)
-    segment.close()
-
-    val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, 
fileDeleteDelayMs = 1000)
-    val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
-
-    val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse 
{
-      throw new AssertionError("Failed to create log with a segment which has 
overflowed offsets")
-    }
-
-    val allRecordsBeforeSplit = UnifiedLogTest.allRecords(log)
-    log.splitOverflowedSegment(segmentWithOverflow)
-
-    assertEquals(1, log.numberOfSegments)
-
-    val firstBatchBaseOffset = records.head.batches.asScala.head.baseOffset
-    assertEquals(firstBatchBaseOffset, log.activeSegment.baseOffset)
-    UnifiedLogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
-
-    assertFalse(LogTestUtils.hasOffsetOverflow(log))
-  }
-
-  def epochCache(log: UnifiedLog): LeaderEpochFileCache = log.leaderEpochCache
-
-  @Test
-  def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    val lastOffset = 50L
-
-    val producerEpoch = 0.toShort
-    val producerId = 15L
-    val appendProducer = LogTestUtils.appendTransactionalAsLeader(log, 
producerId, producerEpoch, mockTime)
-
-    // Thread 1 writes single-record transactions and attempts to read them
-    // before they have been aborted, and then aborts them
-    val txnWriteAndReadLoop: Callable[Int] = () => {
-      var nonEmptyReads = 0
-      while (log.logEndOffset < lastOffset) {
-        val currentLogEndOffset = log.logEndOffset
-
-        appendProducer(1)
-
-        val readInfo = log.read(currentLogEndOffset, Int.MaxValue, 
FetchIsolation.TXN_COMMITTED, false)
-
-        if (readInfo.records.sizeInBytes() > 0)
-          nonEmptyReads += 1
-
-        LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, 
producerEpoch, ControlRecordType.ABORT,
-          mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel())
-      }
-      nonEmptyReads
-    }
-
-    // Thread 2 watches the log and updates the high watermark
-    val hwUpdateLoop: Runnable = () => {
-      while (log.logEndOffset < lastOffset) {
-        log.updateHighWatermark(log.logEndOffset)
-      }
-    }
-
-    val executor = Executors.newFixedThreadPool(2)
-    try {
-      executor.submit(hwUpdateLoop)
-
-      val future = executor.submit(txnWriteAndReadLoop)
-      val nonEmptyReads = future.get()
-
-      assertEquals(0, nonEmptyReads)
-    } finally {
-      executor.shutdownNow()
-    }
-  }
-
-
-  @Test
-  def testTransactionIndexUpdated(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 
5)
-    val log = createLog(logDir, logConfig)
-    val epoch = 0.toShort
-
-    val pid1 = 1L
-    val pid2 = 2L
-    val pid3 = 3L
-    val pid4 = 4L
-
-    val appendPid1 = LogTestUtils.appendTransactionalAsLeader(log, pid1, 
epoch, mockTime)
-    val appendPid2 = LogTestUtils.appendTransactionalAsLeader(log, pid2, 
epoch, mockTime)
-    val appendPid3 = LogTestUtils.appendTransactionalAsLeader(log, pid3, 
epoch, mockTime)
-    val appendPid4 = LogTestUtils.appendTransactionalAsLeader(log, pid4, 
epoch, mockTime)
-
-    // mix transactional and non-transactional data
-    appendPid1(5) // nextOffset: 5
-    LogTestUtils.appendNonTransactionalAsLeader(log, 3) // 8
-    appendPid2(2) // 10
-    appendPid1(4) // 14
-    appendPid3(3) // 17
-    LogTestUtils.appendNonTransactionalAsLeader(log, 2) // 19
-    appendPid1(10) // 29
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, epoch, 
ControlRecordType.ABORT,
-      mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel()) // 30
-    appendPid2(6) // 36
-    appendPid4(3) // 39
-    LogTestUtils.appendNonTransactionalAsLeader(log, 10) // 49
-    appendPid3(9) // 58
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid3, epoch, 
ControlRecordType.COMMIT,
-      mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel()) // 59
-    appendPid4(8) // 67
-    appendPid2(7) // 74
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, epoch, 
ControlRecordType.ABORT,
-      mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel()) // 75
-    LogTestUtils.appendNonTransactionalAsLeader(log, 10) // 85
-    appendPid4(4) // 89
-    LogTestUtils.appendEndTxnMarkerAsLeader(log, pid4, epoch, 
ControlRecordType.COMMIT,
-      mockTime.milliseconds(), transactionVersion = 
TransactionVersion.TV_0.featureLevel()) // 90
-
-    val abortedTransactions = LogTestUtils.allAbortedTransactions(log)
-    val expectedTransactions = List(
-      new AbortedTxn(pid1, 0L, 29L, 8L),
-      new AbortedTxn(pid2, 8L, 74L, 36L)
-    )
-    assertEquals(expectedTransactions, abortedTransactions)
-
-    // Verify caching of the segment position of the first unstable offset
-    log.updateHighWatermark(30L)
-    assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
-
-    log.updateHighWatermark(75L)
-    assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
-
-    log.updateHighWatermark(log.logEndOffset)
-    assertEquals(Optional.empty, log.firstUnstableOffset)
-  }
 
   @Test
   def testTransactionIndexUpdatedThroughReplication(): Unit = {
@@ -2443,53 +1714,6 @@ class UnifiedLogTest {
     log
   }
 
-  private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog, 
LogSegment) = {
-    LogTestUtils.initializeLogDirWithOverflowedSegment(logDir)
-
-    val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
-    val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse 
{
-      throw new AssertionError("Failed to create log with a segment which has 
overflowed offsets")
-    }
-
-    (log, segmentWithOverflow)
-  }
-
-  private def assertFetchOffsetByTimestamp(log: UnifiedLog, 
remoteLogManagerOpt: Option[RemoteLogManager], expected: 
Option[TimestampAndOffset], timestamp: Long): Unit = {
-    val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt)
-    val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, 
remoteOffsetReader)
-    assertTrue(offsetResultHolder.futureHolderOpt.isPresent)
-    offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
-    assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
-    
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().hasTimestampAndOffset)
-    assertEquals(expected.get, 
offsetResultHolder.futureHolderOpt.get.taskFuture.get().timestampAndOffset().orElse(null))
-  }
-
-  private def assertFetchOffsetBySpecialTimestamp(log: UnifiedLog, 
remoteLogManagerOpt: Option[RemoteLogManager], expected: TimestampAndOffset, 
timestamp: Long): Unit = {
-    val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt)
-    val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, 
remoteOffsetReader)
-    assertEquals(new OffsetResultHolder(expected), offsetResultHolder)
-  }
-
-  private def getRemoteOffsetReader(remoteLogManagerOpt: Option[Any]): 
Optional[AsyncOffsetReader] = {
-    remoteLogManagerOpt match {
-      case Some(remoteLogManager) => 
Optional.of(remoteLogManager.asInstanceOf[AsyncOffsetReader])
-      case None => Optional.empty[AsyncOffsetReader]()
-    }
-  }
-
-  private def prepareLogWithSequentialRecords(log: UnifiedLog, recordCount: 
Int): Seq[TimestampAndEpoch] = {
-    val firstTimestamp = mockTime.milliseconds()
-
-    (0 until recordCount).map { i =>
-      val timestampAndEpoch = TimestampAndEpoch(firstTimestamp + i, i)
-      log.appendAsLeader(
-        TestUtils.singletonRecords(value = TestUtils.randomBytes(10), 
timestamp = timestampAndEpoch.timestamp),
-        timestampAndEpoch.leaderEpoch
-      )
-      timestampAndEpoch
-    }
-  }
-
   case class TimestampAndEpoch(timestamp: Long, leaderEpoch: Int)
 
   @Test
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index 92a1e8cf0d9..e9efb9e5af7 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -30,17 +30,21 @@ import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.common.RequestLocal;
+import org.apache.kafka.server.storage.log.FetchIsolation;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.function.LongFunction;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -196,6 +200,10 @@ public class LogTestUtils {
         return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch);
     }
 
+    public static MemoryRecords records(List<SimpleRecord> records, byte 
magicValue, long baseOffset) {
+        return records(records, magicValue, Compression.NONE, 
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
     public static void deleteProducerSnapshotFiles(File logDir) {
         Stream.of(logDir.listFiles())
                 .filter(f -> f.isFile() && 
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX))
@@ -206,6 +214,79 @@ public class LogTestUtils {
         return ProducerStateManager.listSnapshotFiles(logDir).stream().map(f 
-> f.offset).sorted().toList();
     }
 
+    public static FetchDataInfo readLog(UnifiedLog log,
+                                        long startOffset,
+                                        int maxLength,
+                                        FetchIsolation isolation,
+                                        boolean minOneMessage) throws 
IOException {
+        return log.read(startOffset, maxLength, isolation, minOneMessage);
+    }
+
+    public static FetchDataInfo readLog(UnifiedLog log, long startOffset, int 
maxLength) throws IOException {
+        return readLog(log, startOffset, maxLength, FetchIsolation.LOG_END, 
true);
+    }
+
+    public static boolean hasOffsetOverflow(UnifiedLog log) {
+        return firstOverflowSegment(log).isPresent();
+    }
+
+    public static Optional<LogSegment> firstOverflowSegment(UnifiedLog log) {
+        for (LogSegment segment : log.logSegments()) {
+            for (RecordBatch batch : segment.log().batches()) {
+                if (batch.lastOffset() > segment.baseOffset() + 
Integer.MAX_VALUE || batch.baseOffset() < segment.baseOffset()) {
+                    return Optional.of(segment);
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    public static FileRecords rawSegment(File logDir, long baseOffset) throws 
IOException {
+        return FileRecords.open(LogFileUtils.logFile(logDir, baseOffset));
+    }
+
+    /**
+     * Initialize the given log directory with a set of segments, one of which 
will have an
+     * offset which overflows the segment
+     */
+    public static void initializeLogDirWithOverflowedSegment(File logDir) 
throws IOException {
+        long nextOffset = 0L;
+        nextOffset = writeNormalSegment(logDir, nextOffset);
+        nextOffset = writeOverflowSegment(logDir, nextOffset);
+        writeNormalSegment(logDir, nextOffset);
+    }
+
+    private static long writeSampleBatches(File logDir, long baseOffset, 
FileRecords segment) throws IOException {
+        LongFunction<SimpleRecord> record = offset -> {
+            byte[] data = Long.toString(offset).getBytes();
+            return new SimpleRecord(data, data);
+        };
+        segment.append(MemoryRecords.withRecords(baseOffset, Compression.NONE, 
0,
+            record.apply(baseOffset)));
+        segment.append(MemoryRecords.withRecords(baseOffset + 1, 
Compression.NONE, 0,
+            record.apply(baseOffset + 1),
+            record.apply(baseOffset + 2)));
+        segment.append(MemoryRecords.withRecords(baseOffset + 
Integer.MAX_VALUE - 1, Compression.NONE, 0,
+            record.apply(baseOffset + Integer.MAX_VALUE - 1)));
+        // Need to create the offset files explicitly to avoid triggering 
segment recovery to truncate segment.
+        Files.createFile(LogFileUtils.offsetIndexFile(logDir, 
baseOffset).toPath());
+        Files.createFile(LogFileUtils.timeIndexFile(logDir, 
baseOffset).toPath());
+        return baseOffset + Integer.MAX_VALUE;
+    }
+
+    private static long writeNormalSegment(File logDir, long baseOffset) 
throws IOException {
+        try (FileRecords segment = rawSegment(logDir, baseOffset)) {
+            return writeSampleBatches(logDir, baseOffset, segment);
+        }
+    }
+
+    private static long writeOverflowSegment(File logDir, long baseOffset) 
throws IOException {
+        try (FileRecords segment = rawSegment(logDir, baseOffset)) {
+            long nextOffset = writeSampleBatches(logDir, baseOffset, segment);
+            return writeSampleBatches(logDir, nextOffset, segment);
+        }
+    }
+
     public static void appendNonTransactionalAsLeader(UnifiedLog log, int 
numRecords) throws IOException {
         List<SimpleRecord> simpleRecords = new ArrayList<>();
         for (int i = 0; i < numRecords; i++) {
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index a0a207c512a..a9aa710c701 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.config.AbstractConfig;
@@ -32,6 +33,7 @@ import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.message.DescribeProducersResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.record.internal.CompressionType;
 import org.apache.kafka.common.record.internal.ControlRecordType;
 import org.apache.kafka.common.record.internal.DefaultRecordBatch;
 import org.apache.kafka.common.record.internal.FileRecords;
@@ -40,6 +42,7 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.internal.Record;
 import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.record.internal.RecordVersion;
 import org.apache.kafka.common.record.internal.Records;
 import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -60,6 +63,8 @@ import 
org.apache.kafka.server.storage.log.UnexpectedAppendOffsetException;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.Scheduler;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 import org.apache.kafka.storage.internals.utils.Throttler;
 import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
@@ -72,8 +77,10 @@ import com.yammer.metrics.core.Meter;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
@@ -92,7 +99,11 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
@@ -101,7 +112,9 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -128,8 +141,9 @@ public class UnifiedLogTest {
     private final int maxTransactionTimeoutMs = 60 * 60 * 1000;
     private final ProducerStateManagerConfig producerStateManagerConfig = new 
ProducerStateManagerConfig(maxTransactionTimeoutMs, false);
     private final List<UnifiedLog> logsToClose = new ArrayList<>();
-
+    private RemoteLogManager remoteLogManager;
     private UnifiedLog log;
+    private List<TimestampAndEpoch> timestampAndEpochs;
 
     @AfterEach
     public void tearDown() throws IOException {
@@ -2391,6 +2405,11 @@ public class UnifiedLogTest {
                 producerStateManagerConfig, true, Optional.empty(), 
remoteStorageSystemEnable);
     }
 
+    private UnifiedLog createLog(File dir, LogConfig config, long 
recoveryPoint) throws IOException {
+        return createLog(dir, config, 0L, recoveryPoint, brokerTopicStats, 
mockTime.scheduler, mockTime,
+                producerStateManagerConfig, true, Optional.empty(), false);
+    }
+
     private UnifiedLog createLog(
             File dir,
             LogConfig config,
@@ -2445,10 +2464,6 @@ public class UnifiedLogTest {
         return log;
     }
 
-    public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
-        return singletonRecords(value, key, Compression.NONE, 
RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
-    }
-
     public static MemoryRecords singletonRecords(byte[] value, long timestamp) 
{
         return singletonRecords(value, null, Compression.NONE, timestamp, 
RecordBatch.CURRENT_MAGIC_VALUE);
     }
@@ -3407,6 +3422,888 @@ public class UnifiedLogTest {
                 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Optional.of(remoteLogManager)));
     }
 
+    @Test
+    public void testFetchEarliestPendingUploadTimestampNoRemoteStorage() 
throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(200)
+                .indexIntervalBytes(1)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // Test initial state before any records
+        assertFetchOffsetBySpecialTimestamp(log, Optional.empty(), 
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, 
Optional.of(-1)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+
+        // Append records
+        prepareLogWithSequentialRecords(log, 2);
+
+        // Test state after records are appended
+        assertFetchOffsetBySpecialTimestamp(log, Optional.empty(),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, 
Optional.of(-1)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    @Test
+    public void testFetchEarliestPendingUploadTimestampWithRemoteStorage() 
throws Exception {
+        int logStartOffset = 0;
+        prepare(logStartOffset);
+
+        long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+        int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+        long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+        int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+        int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+        doAnswer(ans -> {
+            long timestamp = ans.getArgument(1);
+            if (timestamp == firstTimestamp) {
+                return Optional.of(new 
FileRecords.TimestampAndOffset(timestamp, 0L, Optional.of(firstLeaderEpoch)));
+            }
+            return Optional.empty();
+        }).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()),
+                anyLong(),
+                anyLong(),
+                eq(log.leaderEpochCache()));
+
+        // Offset 0 (first timestamp) is in remote storage and deleted 
locally. Offset 1 (second timestamp) is in local storage.
+        log.updateLocalLogStartOffset(1);
+        log.updateHighestOffsetInRemoteStorage(0);
+
+        // In the assertions below we test that offset 0 (first timestamp) is 
only in remote and offset 1 (second timestamp) is in local storage.
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch))), firstTimestamp);
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp, 
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    @Test
+    public void 
testFetchEarliestPendingUploadTimestampWithRemoteStorageNoLocalDeletion() 
throws Exception {
+        int logStartOffset = 0;
+        prepare(logStartOffset);
+
+        long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+        int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+        long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+        int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+        int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+        // Offsets upto 1 are in remote storage
+        doAnswer(ans -> {
+            long timestamp = ans.getArgument(1);
+            if (timestamp == firstTimestamp) {
+                return Optional.of(new 
FileRecords.TimestampAndOffset(timestamp, 0L, Optional.of(firstLeaderEpoch)));
+            } else if (timestamp == secondTimestamp) {
+                return Optional.of(new 
FileRecords.TimestampAndOffset(timestamp, 1L, Optional.of(secondLeaderEpoch)));
+            }
+            return Optional.empty();
+        }).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()),
+                anyLong(),
+                anyLong(),
+                eq(log.leaderEpochCache()));
+
+        // Offsets 0, 1 (first and second timestamps) are in remote storage 
and not deleted locally.
+        log.updateLocalLogStartOffset(0);
+        log.updateHighestOffsetInRemoteStorage(1);
+
+        // In the assertions below we test that offset 0 (first timestamp) and 
offset 1 (second timestamp) are on both remote and local storage
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch))), firstTimestamp);
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp, 
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    @Test
+    public void testFetchEarliestPendingUploadTimestampNoSegmentsUploaded() 
throws Exception {
+        int logStartOffset = 0;
+        prepare(logStartOffset);
+
+        long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+        int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+        long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+        int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+        int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+        // No offsets are in remote storage
+        doAnswer(ans -> 
Optional.empty()).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()),
+                anyLong(),
+                anyLong(),
+                eq(log.leaderEpochCache()));
+
+        // Offsets 0, 1, 2 (first, second and third timestamps) are in local 
storage only and not uploaded to remote storage.
+        log.updateLocalLogStartOffset(0);
+        log.updateHighestOffsetInRemoteStorage(-1);
+
+        // In the assertions below we test that offset 0 (first timestamp), 
offset 1 (second timestamp) and offset 2 (third timestamp) are only on the 
local storage.
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch))), firstTimestamp);
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp, 
1L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1L, 
Optional.of(-1)),
+            ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    @Test
+    public void 
testFetchEarliestPendingUploadTimestampStaleHighestOffsetInRemote() throws 
Exception {
+        int logStartOffset = 100;
+        prepare(logStartOffset);
+
+        long firstTimestamp = timestampAndEpochs.get(0).timestamp;
+        int firstLeaderEpoch = timestampAndEpochs.get(0).leaderEpoch;
+        long secondTimestamp = timestampAndEpochs.get(1).timestamp;
+        int secondLeaderEpoch = timestampAndEpochs.get(1).leaderEpoch;
+        int thirdLeaderEpoch = timestampAndEpochs.get(2).leaderEpoch;
+
+        // Offsets 100, 101, 102 (first, second and third timestamps) are in 
local storage and not uploaded to remote storage.
+        // Tiered storage copy was disabled and then enabled again, because of 
which the remote log segments are deleted but
+        // the highest offset in remote storage has become stale
+        doAnswer(ans -> 
Optional.empty()).when(remoteLogManager).findOffsetByTimestamp(
+                eq(log.topicPartition()),
+                anyLong(),
+                anyLong(),
+                eq(log.leaderEpochCache()));
+
+        log.updateLocalLogStartOffset(100);
+        log.updateHighestOffsetInRemoteStorage(50);
+
+        // In the assertions below we test that offset 100 (first timestamp), 
offset 101 (second timestamp) and offset 102 (third timestamp) are only on the 
local storage.
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(firstTimestamp, 
100L, Optional.of(firstLeaderEpoch))), firstTimestamp);
+        assertFetchOffsetByTimestamp(log, Optional.of(remoteLogManager),
+            Optional.of(new FileRecords.TimestampAndOffset(secondTimestamp, 
101L, Optional.of(secondLeaderEpoch))), secondTimestamp);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 50L, 
Optional.empty()),
+            ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 103L, 
Optional.of(thirdLeaderEpoch)),
+            ListOffsetsRequest.LATEST_TIMESTAMP);
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+    }
+
+    /**
+     * Test the Log truncate operations
+     */
+    @Test
+    public void testTruncateTo() throws IOException {
+        MemoryRecords createRecords = singletonRecords("test".getBytes(), 
mockTime.milliseconds());
+        int setSize = createRecords.sizeInBytes();
+        int msgPerSeg = 10;
+        int segmentSize = msgPerSeg * setSize; // each segment will be 10 
messages
+
+        // create a log
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+
+        for (int i = 0; i < msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds()), 0);
+
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segments.");
+        assertEquals(msgPerSeg, log.logEndOffset(), "Log end offset should be 
equal to number of messages");
+
+        long lastOffset = log.logEndOffset();
+        long size = log.size();
+        log.truncateTo(log.logEndOffset()); // keep the entire log
+        assertEquals(lastOffset, log.logEndOffset(), "Should not change 
offset");
+        assertEquals(size, log.size(), "Should not change log size");
+        log.truncateTo(log.logEndOffset() + 1); // try to truncate beyond 
lastOffset
+        assertEquals(lastOffset, log.logEndOffset(), "Should not change offset 
but should log error");
+        assertEquals(size, log.size(), "Should not change log size");
+        log.truncateTo(msgPerSeg / 2); // truncate somewhere in between
+        assertEquals(msgPerSeg / 2, log.logEndOffset(), "Should change 
offset");
+        assertTrue(log.size() < size, "Should change log size");
+        log.truncateTo(0); // truncate the entire log
+        assertEquals(0, log.logEndOffset(), "Should change offset");
+        assertEquals(0, log.size(), "Should change log size");
+
+        for (int i = 0; i < msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds()), 0);
+
+        assertEquals(lastOffset, log.logEndOffset(), "Should be back to 
original offset");
+        assertEquals(size, log.size(), "Should be back to original size");
+        log.truncateFullyAndStartAt(log.logEndOffset() - (msgPerSeg - 1), 
Optional.empty());
+        assertEquals(lastOffset - (msgPerSeg - 1), log.logEndOffset(), "Should 
change offset");
+        assertEquals(0, log.size(), "Should change log size");
+
+        for (int i = 0; i < msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds()), 0);
+
+        assertTrue(log.logEndOffset() > msgPerSeg, "Should be ahead of to 
original offset");
+        assertEquals(size, log.size(), "log size should be same as before");
+        log.truncateTo(0); // truncate before first start offset in the log
+        assertEquals(0, log.logEndOffset(), "Should change offset");
+        assertEquals(0, log.size(), "Should change log size");
+    }
+
+    /**
+     * Verify that when we truncate a log the index of the last segment is 
resized to the max index size to allow more appends
+     */
+    @Test
+    public void testIndexResizingAtTruncation() throws IOException {
+        int setSize = singletonRecords("test".getBytes(), 
mockTime.milliseconds()).sizeInBytes();
+        int msgPerSeg = 10;
+        int segmentSize = msgPerSeg * setSize; // each segment will be 10 
messages
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(segmentSize)
+                .indexIntervalBytes(setSize - 1)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+
+        for (int i = 1; i <= msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds() + i), 0);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+
+        mockTime.sleep(msgPerSeg);
+        for (int i = 1; i <= msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds() + i), 0);
+        assertEquals(2, log.numberOfSegments(), "There should be exactly 2 
segment.");
+        int expectedEntries = msgPerSeg - 1;
+
+        List<LogSegment> segments = new ArrayList<>(log.logSegments());
+        assertEquals(expectedEntries, 
segments.get(0).offsetIndex().maxEntries(),
+            "The index of the first segment should have " + expectedEntries + 
" entries");
+        assertEquals(expectedEntries, segments.get(0).timeIndex().maxEntries(),
+            "The time index of the first segment should have " + 
expectedEntries + " entries");
+
+        log.truncateTo(0);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+        assertEquals(log.config().maxIndexSize / 8, new 
ArrayList<>(log.logSegments()).get(0).offsetIndex().maxEntries(),
+            "The index of segment 1 should be resized to maxIndexSize");
+        assertEquals(log.config().maxIndexSize / 12, new 
ArrayList<>(log.logSegments()).get(0).timeIndex().maxEntries(),
+            "The time index of segment 1 should be resized to maxIndexSize");
+
+        mockTime.sleep(msgPerSeg);
+        for (int i = 1; i <= msgPerSeg; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds() + i), 0);
+        assertEquals(1, log.numberOfSegments(), "There should be exactly 1 
segment.");
+    }
+
+    /**
+     * Test that deleted files are deleted after the appropriate time.
+     */
+    @Test
+    public void testAsyncDelete() throws IOException {
+        MemoryRecords createRecords = singletonRecords("test".getBytes(), 
mockTime.milliseconds() - 1000L);
+        int asyncDeleteMs = 1000;
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(createRecords.sizeInBytes() * 5)
+                .segmentIndexBytes(1000)
+                .indexIntervalBytes(10000)
+                .retentionMs(999)
+                .fileDeleteDelayMs(asyncDeleteMs)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // append some messages to create some segments
+        for (int i = 0; i < 100; i++)
+            log.appendAsLeader(singletonRecords("test".getBytes(), 
mockTime.milliseconds() - 1000L), 0);
+
+        // files should be renamed
+        List<LogSegment> segments = new ArrayList<>(log.logSegments());
+        List<File> oldFiles = new ArrayList<>();
+        for (LogSegment segment : segments) {
+            oldFiles.add(segment.log().file());
+            oldFiles.add(segment.offsetIndexFile());
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+
+        assertEquals(1, log.numberOfSegments(), "Only one segment should 
remain.");
+        assertTrue(segments.stream().allMatch(s -> 
s.log().file().getName().endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) &&
+            segments.stream().allMatch(s -> 
s.offsetIndexFile().getName().endsWith(LogFileUtils.DELETED_FILE_SUFFIX)),
+            "All log and index files should end in .deleted");
+        assertTrue(segments.stream().allMatch(s -> s.log().file().exists()) &&
+            segments.stream().allMatch(s -> s.offsetIndexFile().exists()),
+            "The .deleted files should still be there.");
+        assertTrue(oldFiles.stream().noneMatch(File::exists), "The original 
file should be gone.");
+
+        // when enough time passes the files should be deleted
+        List<File> deletedFiles = new ArrayList<>();
+        for (LogSegment segment : segments) {
+            deletedFiles.add(segment.log().file());
+            deletedFiles.add(segment.offsetIndexFile());
+        }
+        mockTime.sleep(asyncDeleteMs + 1);
+        assertTrue(deletedFiles.stream().noneMatch(File::exists), "Files 
should all be gone.");
+    }
+
+    @Test
+    public void testAppendMessageWithNullPayload() throws IOException {
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord(null, null)), 0);
+        Record head = LogTestUtils.readLog(log, 0, 
4096).records.records().iterator().next();
+        assertEquals(0, head.offset());
+        assertFalse(head.hasValue(), "Message payload should be null.");
+    }
+
+    @Test
+    public void testAppendWithOutOfOrderOffsetsThrowsException() throws 
IOException {
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+
+        int epoch = 0;
+        long[] appendOffsets = {0L, 1L, 3L, 2L, 4L};
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+        for (long offset : appendOffsets) {
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, offset, 
mockTime.milliseconds(), 1L, (short) 0, 0, false, epoch);
+            builder.append(new SimpleRecord("key".getBytes(), 
"value".getBytes()));
+            builder.close();
+        }
+        buffer.flip();
+        MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+
+        assertThrows(OffsetsOutOfOrderException.class, () -> 
log.appendAsFollower(memoryRecords, epoch));
+    }
+
+    private static Stream<Arguments> magicAndCompressionTypes() {
+        return Stream.of(
+            RecordBatch.MAGIC_VALUE_V0,
+            RecordBatch.MAGIC_VALUE_V1,
+            RecordBatch.MAGIC_VALUE_V2
+        ).flatMap(magic -> Stream.of(CompressionType.NONE, CompressionType.LZ4)
+            .map(compressionType -> Arguments.of(magic, compressionType)));
+    }
+
+    @ParameterizedTest(name = "magic={0}, compressionType={1}")
+    @MethodSource("magicAndCompressionTypes")
+    public void testAppendBelowExpectedOffsetThrowsException(byte magic, 
CompressionType compressionType) throws IOException {
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        for (int id = 0; id < 2; id++)
+            log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord(Integer.toString(id).getBytes())), 0);
+
+        Compression compression = Compression.of(compressionType).build();
+        MemoryRecords invalidRecord = MemoryRecords.withRecords(magic, 
compression,
+            new SimpleRecord(Integer.toString(1).getBytes()));
+        assertThrows(
+            UnexpectedAppendOffsetException.class,
+            () -> log.appendAsFollower(invalidRecord, Integer.MAX_VALUE)
+        );
+    }
+
+    @ParameterizedTest(name = "magic={0}, compressionType={1}")
+    @MethodSource("magicAndCompressionTypes")
+    public void testAppendEmptyLogBelowLogStartOffsetThrowsException(byte 
magic, CompressionType compressionType) throws IOException {
+        createEmptyLogs(logDir, 7);
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        assertEquals(7L, log.logStartOffset());
+        assertEquals(7L, log.logEndOffset());
+
+        long firstOffset = 4L;
+        MemoryRecords batch = LogTestUtils.records(
+            List.of(new SimpleRecord("k1".getBytes(), "v1".getBytes()),
+                    new SimpleRecord("k2".getBytes(), "v2".getBytes()),
+                    new SimpleRecord("k3".getBytes(), "v3".getBytes())),
+            magic, Compression.of(compressionType).build(),
+            RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
+            firstOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+        UnexpectedAppendOffsetException exception = assertThrows(
+            UnexpectedAppendOffsetException.class,
+            () -> log.appendAsFollower(batch, Integer.MAX_VALUE)
+        );
+        assertEquals(firstOffset, exception.firstOffset,
+            "UnexpectedAppendOffsetException#firstOffset");
+        assertEquals(firstOffset + 2, exception.lastOffset,
+            "UnexpectedAppendOffsetException#lastOffset");
+    }
+
+    @Test
+    public void testAppendWithNoTimestamp() throws IOException {
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+            new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes(), 
"value".getBytes())), 0);
+    }
+
+    @Test
+    public void testAppendToOrReadFromLogInFailedLogDir() throws IOException {
+        long pid = 1L;
+        short epoch = 0;
+        UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+        log.appendAsLeader(LogTestUtils.singletonRecords(null, null), 0);
+        assertEquals(0, LogTestUtils.readLog(log, 0, 
4096).records.records().iterator().next().offset());
+        Consumer<Integer> append = 
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+        append.accept(10);
+        // Kind of a hack, but renaming the index to a directory ensures that 
the append
+        // to the index will fail.
+        log.activeSegment().txnIndex().renameTo(log.dir());
+        assertThrows(KafkaStorageException.class, () -> 
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, 
ControlRecordType.ABORT,
+            mockTime.milliseconds(), 1, 0, 
TransactionVersion.TV_0.featureLevel()));
+        assertThrows(KafkaStorageException.class, () -> 
log.appendAsLeader(LogTestUtils.singletonRecords(null, null), 0));
+        assertThrows(KafkaStorageException.class, () -> 
LogTestUtils.readLog(log, 0, 
4096).records.records().iterator().next().offset());
+    }
+
+    @Test
+    public void testWriteLeaderEpochCheckpointAfterDirectoryRename() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1000)
+                .indexIntervalBytes(1)
+                .maxMessageBytes(64 * 1024)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 5);
+        assertEquals(Optional.of(5), log.latestEpoch());
+
+        // Ensure that after a directory rename, the epoch cache is written to 
the right location
+        TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+        log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 10);
+        assertEquals(Optional.of(10), log.latestEpoch());
+        assertTrue(LeaderEpochCheckpointFile.newFile(log.dir()).exists());
+        assertFalse(LeaderEpochCheckpointFile.newFile(logDir).exists());
+    }
+
+    @Test
+    public void testTopicIdTransfersAfterDirectoryRename() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1000)
+                .indexIntervalBytes(1)
+                .maxMessageBytes(64 * 1024)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
+        Uuid topicId = Uuid.randomUuid();
+        log.assignTopicId(topicId);
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 5);
+        assertEquals(Optional.of(5), log.latestEpoch());
+
+        // Ensure that after a directory rename, the partition metadata file 
is written to the right location.
+        TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+        log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 10);
+        assertEquals(Optional.of(10), log.latestEpoch());
+        assertTrue(PartitionMetadataFile.newFile(log.dir()).exists());
+        assertFalse(PartitionMetadataFile.newFile(logDir).exists());
+
+        // Check the topic ID remains in memory and was copied correctly.
+        assertTrue(log.topicId().isPresent());
+        assertEquals(topicId, log.topicId().get());
+        assertEquals(topicId, 
log.partitionMetadataFile().get().read().topicId());
+    }
+
+    @Test
+    public void testTopicIdFlushesBeforeDirectoryRename() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1000)
+                .indexIntervalBytes(1)
+                .maxMessageBytes(64 * 1024)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        // Write a topic ID to the partition metadata file to ensure it is 
transferred correctly.
+        Uuid topicId = Uuid.randomUuid();
+        log.partitionMetadataFile().get().record(topicId);
+
+        // Ensure that after a directory rename, the partition metadata file 
is written to the right location.
+        TopicPartition tp = UnifiedLog.parseTopicPartitionName(log.dir());
+        log.renameDir(UnifiedLog.logDeleteDirName(tp), true);
+        assertTrue(PartitionMetadataFile.newFile(log.dir()).exists());
+        assertFalse(PartitionMetadataFile.newFile(logDir).exists());
+
+        // Check the file holds the correct contents.
+        assertTrue(log.partitionMetadataFile().get().exists());
+        assertEquals(topicId, 
log.partitionMetadataFile().get().read().topicId());
+    }
+
+    @Test
+    public void testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages() 
throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1000)
+                .indexIntervalBytes(1)
+                .maxMessageBytes(64 * 1024)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 5);
+        assertEquals(Optional.of(5), log.leaderEpochCache().latestEpoch());
+
+        log.appendAsFollower(
+            LogTestUtils.records(List.of(new SimpleRecord("foo".getBytes())), 
RecordVersion.V1.value, 1L),
+            5
+        );
+        assertEquals(Optional.empty(), log.leaderEpochCache().latestEpoch());
+    }
+
+    @Test
+    public void testLeaderEpochCacheCreatedAfterMessageFormatUpgrade() throws 
IOException {
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "1000");
+        logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
+        logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "65536");
+        LogConfig logConfig = new LogConfig(logProps);
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeaderWithRecordVersion(LogTestUtils.records(List.of(new 
SimpleRecord("bar".getBytes())),
+            RecordBatch.MAGIC_VALUE_V1, 0L), 5, RecordVersion.V1);
+        assertTrue(log.latestEpoch().isEmpty());
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes()))), 5);
+        assertEquals(5, log.latestEpoch().get());
+    }
+
+    @Test
+    public void testSplitOnOffsetOverflow() throws IOException {
+        // create a log such that one log segment has offsets that overflow, 
and call the split API on that segment
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .indexIntervalBytes(1)
+                .fileDeleteDelayMs(1000)
+                .build();
+        LogSegment segmentWithOverflow = createLogWithOverflow(logConfig);
+        assertTrue(LogTestUtils.hasOffsetOverflow(log), "At least one segment 
must have offset overflow");
+
+        List<Record> allRecordsBeforeSplit = allRecords(log);
+
+        // split the segment with overflow
+        log.splitOverflowedSegment(segmentWithOverflow);
+
+        // assert we were successfully able to split the segment
+        assertEquals(4, log.numberOfSegments());
+        verifyRecordsInLog(log, allRecordsBeforeSplit);
+
+        // verify we do not have offset overflow anymore
+        assertFalse(LogTestUtils.hasOffsetOverflow(log));
+    }
+
+    @Test
+    public void testDegenerateSegmentSplit() throws IOException {
+        // This tests a scenario where all of the batches appended to a 
segment have overflowed.
+        // When we split the overflowed segment, only one new segment will be 
created.
+
+        long overflowOffset = (long) Integer.MAX_VALUE + 1;
+        MemoryRecords batch1 = MemoryRecords.withRecords(overflowOffset, 
Compression.NONE, 0,
+            new SimpleRecord("a".getBytes()));
+        MemoryRecords batch2 = MemoryRecords.withRecords(overflowOffset + 1, 
Compression.NONE, 0,
+            new SimpleRecord("b".getBytes()));
+
+        testDegenerateSplitSegmentWithOverflow(0L, List.of(batch1, batch2));
+    }
+
+    @Test
+    public void testDegenerateSegmentSplitWithOutOfRangeBatchLastOffset() 
throws IOException {
+        // Degenerate case where the only batch in the segment overflows. In 
this scenario,
+        // the first offset of the batch is valid, but the last overflows.
+
+        long firstBatchBaseOffset = (long) Integer.MAX_VALUE - 1;
+        MemoryRecords records = 
MemoryRecords.withRecords(firstBatchBaseOffset, Compression.NONE, 0,
+            new SimpleRecord("a".getBytes()),
+            new SimpleRecord("b".getBytes()),
+            new SimpleRecord("c".getBytes()));
+
+        testDegenerateSplitSegmentWithOverflow(0L, List.of(records));
+    }
+
+    @Test
+    public void testReadCommittedWithConcurrentHighWatermarkUpdates() throws 
Exception {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1024 * 1024 * 5)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long lastOffset = 50L;
+
+        short producerEpoch = 0;
+        long producerId = 15L;
+        Consumer<Integer> appendProducer = 
LogTestUtils.appendTransactionalAsLeader(log, producerId, producerEpoch, 
mockTime);
+
+        // Thread 1 writes single-record transactions and attempts to read them
+        // before they have been aborted, and then aborts them
+        Callable<Integer> txnWriteAndReadLoop = () -> {
+            int nonEmptyReads = 0;
+            while (log.logEndOffset() < lastOffset) {
+                long currentLogEndOffset = log.logEndOffset();
+
+                appendProducer.accept(1);
+
+                FetchDataInfo readInfo = log.read(currentLogEndOffset, 
Integer.MAX_VALUE, FetchIsolation.TXN_COMMITTED, false);
+
+                if (readInfo.records.sizeInBytes() > 0)
+                    nonEmptyReads += 1;
+
+                LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, 
producerEpoch, ControlRecordType.ABORT,
+                    mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel());
+            }
+            return nonEmptyReads;
+        };
+
+        // Thread 2 watches the log and updates the high watermark
+        Runnable hwUpdateLoop = () -> assertDoesNotThrow(() -> {
+            while (log.logEndOffset() < lastOffset) {
+                log.updateHighWatermark(log.logEndOffset());
+            }
+        });
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        try {
+            executor.submit(hwUpdateLoop);
+
+            Future<Integer> future = executor.submit(txnWriteAndReadLoop);
+            int nonEmptyReads = future.get();
+
+            assertEquals(0, nonEmptyReads);
+        } finally {
+            executor.shutdownNow();
+            assertDoesNotThrow(() -> executor.awaitTermination(60, 
TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void testTransactionIndexUpdated() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(1024 * 1024 * 5)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        short epoch = 0;
+
+        long pid1 = 1L;
+        long pid2 = 2L;
+        long pid3 = 3L;
+        long pid4 = 4L;
+
+        Consumer<Integer> appendPid1 = 
LogTestUtils.appendTransactionalAsLeader(log, pid1, epoch, mockTime);
+        Consumer<Integer> appendPid2 = 
LogTestUtils.appendTransactionalAsLeader(log, pid2, epoch, mockTime);
+        Consumer<Integer> appendPid3 = 
LogTestUtils.appendTransactionalAsLeader(log, pid3, epoch, mockTime);
+        Consumer<Integer> appendPid4 = 
LogTestUtils.appendTransactionalAsLeader(log, pid4, epoch, mockTime);
+
+        // mix transactional and non-transactional data
+        appendPid1.accept(5); // nextOffset: 5
+        LogTestUtils.appendNonTransactionalAsLeader(log, 3); // 8
+        appendPid2.accept(2); // 10
+        appendPid1.accept(4); // 14
+        appendPid3.accept(3); // 17
+        LogTestUtils.appendNonTransactionalAsLeader(log, 2); // 19
+        appendPid1.accept(10); // 29
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, epoch, 
ControlRecordType.ABORT,
+            mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel()); // 30
+        appendPid2.accept(6); // 36
+        appendPid4.accept(3); // 39
+        LogTestUtils.appendNonTransactionalAsLeader(log, 10); // 49
+        appendPid3.accept(9); // 58
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, pid3, epoch, 
ControlRecordType.COMMIT,
+            mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel()); // 59
+        appendPid4.accept(8); // 67
+        appendPid2.accept(7); // 74
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, epoch, 
ControlRecordType.ABORT,
+            mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel()); // 75
+        LogTestUtils.appendNonTransactionalAsLeader(log, 10); // 85
+        appendPid4.accept(4); // 89
+        LogTestUtils.appendEndTxnMarkerAsLeader(log, pid4, epoch, 
ControlRecordType.COMMIT,
+            mockTime.milliseconds(), 0, 0, 
TransactionVersion.TV_0.featureLevel()); // 90
+
+        List<AbortedTxn> abortedTransactions = new ArrayList<>();
+        for (LogSegment segment : log.logSegments()) {
+            abortedTransactions.addAll(segment.txnIndex().allAbortedTxns());
+        }
+        List<AbortedTxn> expectedTransactions = List.of(
+            new AbortedTxn(pid1, 0L, 29L, 8L),
+            new AbortedTxn(pid2, 8L, 74L, 36L)
+        );
+        assertEquals(expectedTransactions, abortedTransactions);
+
+        // Verify caching of the segment position of the first unstable offset
+        log.updateHighWatermark(30L);
+        assertCachedFirstUnstableOffset(log, 8L);
+
+        log.updateHighWatermark(75L);
+        assertCachedFirstUnstableOffset(log, 36L);
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(Optional.empty(), log.firstUnstableOffset());
+    }
+
+    private void createEmptyLogs(File dir, int... offsets) throws IOException {
+        for (int offset : offsets) {
+            Files.createFile(LogFileUtils.logFile(dir, offset).toPath());
+            Files.createFile(LogFileUtils.offsetIndexFile(dir, 
offset).toPath());
+        }
+    }
+
+    private static List<Record> allRecords(UnifiedLog log) {
+        List<Record> recordsFound = new ArrayList<>();
+        for (LogSegment logSegment : log.logSegments()) {
+            for (RecordBatch batch : logSegment.log().batches()) {
+                batch.iterator().forEachRemaining(recordsFound::add);
+            }
+        }
+        return recordsFound;
+    }
+
+    private static void verifyRecordsInLog(UnifiedLog log, List<Record> 
expectedRecords) {
+        assertEquals(expectedRecords, allRecords(log));
+    }
+
+    private LogSegment createLogWithOverflow(LogConfig logConfig) throws 
IOException {
+        LogTestUtils.initializeLogDirWithOverflowedSegment(logDir);
+        log = createLog(logDir, logConfig, Long.MAX_VALUE);
+        return LogTestUtils.firstOverflowSegment(log).orElseThrow(
+            () -> new AssertionError("Failed to create log with a segment 
which has overflowed offsets"));
+    }
+
+    private void testDegenerateSplitSegmentWithOverflow(long 
segmentBaseOffset, List<MemoryRecords> records) throws IOException {
+        try (FileRecords segment = LogTestUtils.rawSegment(logDir, 
segmentBaseOffset)) {
+            // Need to create the offset files explicitly to avoid triggering 
segment recovery to truncate segment.
+            Files.createFile(LogFileUtils.offsetIndexFile(logDir, 
segmentBaseOffset).toPath());
+            Files.createFile(LogFileUtils.timeIndexFile(logDir, 
segmentBaseOffset).toPath());
+            for (MemoryRecords record : records)
+                segment.append(record);
+        }
+
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .indexIntervalBytes(1)
+                .fileDeleteDelayMs(1000)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig, Long.MAX_VALUE);
+
+        LogSegment segmentWithOverflow = 
LogTestUtils.firstOverflowSegment(log).orElseThrow(
+            () -> new AssertionError("Failed to create log with a segment 
which has overflowed offsets"));
+
+        List<Record> allRecordsBeforeSplit = allRecords(log);
+        log.splitOverflowedSegment(segmentWithOverflow);
+
+        assertEquals(1, log.numberOfSegments());
+
+        long firstBatchBaseOffset = 
records.get(0).batches().iterator().next().baseOffset();
+        assertEquals(firstBatchBaseOffset, log.activeSegment().baseOffset());
+        verifyRecordsInLog(log, allRecordsBeforeSplit);
+
+        assertFalse(LogTestUtils.hasOffsetOverflow(log));
+    }
+
+    private record TimestampAndEpoch(long timestamp, int leaderEpoch) { }
+
+    private void prepare(int logStartOffset) throws IOException {
+        RemoteLogManagerConfig config = createRemoteLogManagerConfig();
+        DelayedOperationPurgatory<DelayedRemoteListOffsets> purgatory = 
+            new DelayedOperationPurgatory<>("RemoteListOffsets", 0);
+        remoteLogManager = spy(new RemoteLogManager(
+            config,
+            0,
+            logDir.getAbsolutePath(),
+            "clusterId",
+            mockTime,
+            tp -> Optional.empty(),
+            (tp, l) -> { },
+            brokerTopicStats,
+            new Metrics(),
+            Optional.empty()
+        ));
+        remoteLogManager.setDelayedOperationPurgatory(purgatory);
+
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(200)
+                .indexIntervalBytes(1)
+                .remoteLogStorageEnable(true)
+                .build();
+
+        log = createLog(logDir, logConfig, logStartOffset, 0L, 
brokerTopicStats,
+                mockTime.scheduler, mockTime, producerStateManagerConfig, 
true, Optional.empty(), true);
+
+        // Verify earliest pending upload offset for empty log
+        assertFetchOffsetBySpecialTimestamp(log, Optional.of(remoteLogManager),
+            new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
logStartOffset, Optional.empty()),
+            ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+
+        timestampAndEpochs = prepareLogWithSequentialRecords(log, 3);
+    }
+
+    private List<TimestampAndEpoch> prepareLogWithSequentialRecords(UnifiedLog 
log, int recordCount) throws IOException {
+        long firstTimestamp = mockTime.milliseconds();
+        List<TimestampAndEpoch> result = new ArrayList<>();
+
+        for (int i = 0; i < recordCount; i++) {
+            TimestampAndEpoch timestampAndEpoch = new 
TimestampAndEpoch(firstTimestamp + i, i);
+            log.appendAsLeader(
+                singletonRecords(TestUtils.randomBytes(10), firstTimestamp + 
i),
+                timestampAndEpoch.leaderEpoch
+            );
+            result.add(timestampAndEpoch);
+        }
+
+        return result;
+    }
+
+    private void assertFetchOffsetBySpecialTimestamp(UnifiedLog log, 
+                                                      
Optional<RemoteLogManager> remoteLogManagerOpt,
+                                                      
FileRecords.TimestampAndOffset expected, 
+                                                      long timestamp) {
+        Optional<AsyncOffsetReader> remoteOffsetReader = 
remoteLogManagerOpt.map(rlm -> rlm);
+        OffsetResultHolder offsetResultHolder = 
log.fetchOffsetByTimestamp(timestamp, remoteOffsetReader);
+        assertEquals(new OffsetResultHolder(expected), offsetResultHolder);
+    }
+
+    private void assertFetchOffsetByTimestamp(UnifiedLog log,
+                                               Optional<RemoteLogManager> 
remoteLogManagerOpt,
+                                               
Optional<FileRecords.TimestampAndOffset> expected,
+                                               long timestamp) throws 
Exception {
+        Optional<AsyncOffsetReader> remoteOffsetReader = 
remoteLogManagerOpt.map(rlm -> rlm);
+        OffsetResultHolder offsetResultHolder = 
log.fetchOffsetByTimestamp(timestamp, remoteOffsetReader);
+        assertTrue(offsetResultHolder.futureHolderOpt().isPresent());
+        offsetResultHolder.futureHolderOpt().get().taskFuture().get(1, 
TimeUnit.SECONDS);
+        
assertTrue(offsetResultHolder.futureHolderOpt().get().taskFuture().isDone());
+        
assertTrue(offsetResultHolder.futureHolderOpt().get().taskFuture().get().hasTimestampAndOffset());
+        assertEquals(expected.get(), 
offsetResultHolder.futureHolderOpt().get().taskFuture().get().timestampAndOffset().orElse(null));
+    }
+
+    private void assertCachedFirstUnstableOffset(UnifiedLog log, long 
expectedOffset) throws IOException {
+        
assertTrue(log.producerStateManager().firstUnstableOffset().isPresent());
+        LogOffsetMetadata firstUnstableOffset = 
log.producerStateManager().firstUnstableOffset().get();
+        assertEquals(expectedOffset, firstUnstableOffset.messageOffset);
+        assertFalse(firstUnstableOffset.messageOffsetOnly());
+        assertValidLogOffsetMetadata(log, firstUnstableOffset);
+    }
+
     private void assertFetchOffsetByTimestamp(RemoteLogManager 
remoteLogManager,
                                                FileRecords.TimestampAndOffset 
expected,
                                                long timestamp,

Reply via email to