This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 07c87a8322a KAFKA-19752 Move parts of UnifiedLogTest to storage module 
(#21807)
07c87a8322a is described below

commit 07c87a8322ab85ba474a35acc1458e6701f45f00
Author: Ken Huang <[email protected]>
AuthorDate: Tue Mar 31 17:02:24 2026 +0800

    KAFKA-19752 Move parts of UnifiedLogTest to storage module (#21807)
    
    testMaybeUpdateHighWatermarkAsFollower ~
    testNonZeroSequenceOnFirstAppendNonZeroEpoch
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 497 +-------------------
 .../storage/internals/log/UnifiedLogTest.java      | 510 ++++++++++++++++++++-
 2 files changed, 499 insertions(+), 508 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 12be8551252..69c3e3efb69 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -20,25 +20,19 @@ package kafka.log
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors._
-import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.internal._
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.{MockTime, Scheduler}
-
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, OffsetResultHolder, ProducerStateManagerConfig, 
UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, 
OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{doThrow, spy}
 
@@ -80,493 +74,6 @@ class UnifiedLogTest {
     }
   }
 
-  @Test
-  def testMaybeUpdateHighWatermarkAsFollower(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig()
-    val log = createLog(logDir, logConfig)
-
-    for (i <- 0 until 100) {
-      val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
-      log.appendAsLeader(records, 0)
-    }
-
-    assertEquals(Optional.of(99L), log.maybeUpdateHighWatermark(99L))
-    assertEquals(Optional.empty, log.maybeUpdateHighWatermark(99L))
-
-    assertEquals(Optional.of(100L), log.maybeUpdateHighWatermark(100L))
-    assertEquals(Optional.empty, log.maybeUpdateHighWatermark(100L))
-
-    // bound by the log end offset
-    assertEquals(Optional.empty, log.maybeUpdateHighWatermark(101L))
-  }
-
-  @Test
-  def testEnableRemoteLogStorageOnCompactedTopics(): Unit = {
-      var logConfig = LogTestUtils.createLogConfig()
-      var log = createLog(logDir, logConfig)
-      assertFalse(log.remoteLogEnabled())
-
-      log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-      assertFalse(log.remoteLogEnabled())
-
-      logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
-      log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-      assertTrue(log.remoteLogEnabled())
-
-      logConfig = LogTestUtils.createLogConfig(cleanupPolicy = 
TopicConfig.CLEANUP_POLICY_COMPACT, remoteLogStorageEnable = true)
-      log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-      assertFalse(log.remoteLogEnabled())
-
-      logConfig = LogTestUtils.createLogConfig(cleanupPolicy = 
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE,
-        remoteLogStorageEnable = true)
-      log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-      assertFalse(log.remoteLogEnabled())
-    }
-
-    @Test
-    def testRemoteLogStorageIsDisabledOnInternalAndRemoteLogMetadataTopic(): 
Unit = {
-      val partitions = 
Seq(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME,
-        Topic.TRANSACTION_STATE_TOPIC_NAME, Topic.TRANSACTION_STATE_TOPIC_NAME)
-        .map(topic => new TopicPartition(topic, 0))
-      for (partition <- partitions) {
-        val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = 
true)
-        val internalLogDir = new File(TestUtils.tempDir(), partition.toString)
-        internalLogDir.mkdir()
-        val log = createLog(internalLogDir, logConfig, 
remoteStorageSystemEnable = true)
-        assertFalse(log.remoteLogEnabled())
-      }
-    }
-
-    @Test
-    def testNoOpWhenRemoteLogStorageIsDisabled(): Unit = {
-      val logConfig = LogTestUtils.createLogConfig()
-      val log = createLog(logDir, logConfig)
-
-      for (i <- 0 until 100) {
-        val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
-        log.appendAsLeader(records, 0)
-      }
-
-      log.updateHighWatermark(90L)
-      log.maybeIncrementLogStartOffset(20L, 
LogStartOffsetIncrementReason.SegmentDeletion)
-      assertEquals(20, log.logStartOffset)
-    }
-
-  @Test
-  def testStartOffsetsRemoteLogStorageIsEnabled(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
-    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
-    for (i <- 0 until 100) {
-      val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
-      log.appendAsLeader(records, 0)
-    }
-
-    log.updateHighWatermark(80L)
-    val newLogStartOffset = 40L
-    log.maybeIncrementLogStartOffset(newLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
-    assertEquals(newLogStartOffset, log.logStartOffset)
-    assertEquals(log.logStartOffset, log.localLogStartOffset())
-
-    // Truncate the local log and verify that the offsets are updated to 
expected values
-    val newLocalLogStartOffset = 60L
-    log.truncateFullyAndStartAt(newLocalLogStartOffset, 
Optional.of(newLogStartOffset))
-    assertEquals(newLogStartOffset, log.logStartOffset)
-    assertEquals(newLocalLogStartOffset, log.localLogStartOffset())
-  }
-
-  private class MockLogOffsetsListener extends LogOffsetsListener {
-    private var highWatermark: Long = -1L
-
-    override def onHighWatermarkUpdated(offset: Long): Unit = {
-      highWatermark = offset
-    }
-
-    private def clear(): Unit = {
-      highWatermark = -1L
-    }
-
-    /**
-     * Verifies the callbacks that have been triggered since the last
-     * verification. Values different than `-1` are the ones that have
-     * been updated.
-     */
-    def verify(expectedHighWatermark: Long = -1L): Unit = {
-      assertEquals(expectedHighWatermark, highWatermark,
-        "Unexpected high watermark")
-      clear()
-    }
-  }
-
-  @Test
-  def testLogOffsetsListener(): Unit = {
-    def records(offset: Long): MemoryRecords = TestUtils.records(List(
-      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
-    ), baseOffset = offset, partitionLeaderEpoch = 0)
-
-    val listener = new MockLogOffsetsListener()
-    listener.verify()
-
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig, logOffsetsListener = listener)
-
-    listener.verify(expectedHighWatermark = 0)
-
-    log.appendAsLeader(records(0), 0)
-    log.appendAsLeader(records(0), 0)
-
-    log.maybeIncrementHighWatermark(new LogOffsetMetadata(4))
-    listener.verify(expectedHighWatermark = 4)
-
-    log.truncateTo(3)
-    listener.verify(expectedHighWatermark = 3)
-
-    log.appendAsLeader(records(0), 0)
-    log.truncateFullyAndStartAt(4, Optional.empty)
-    listener.verify(expectedHighWatermark = 4)
-  }
-
-  @Test
-  def testUpdateLogOffsetsListener(): Unit = {
-    def records(offset: Long): MemoryRecords = TestUtils.records(List(
-      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
-      new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
-    ), baseOffset = offset, partitionLeaderEpoch = 0)
-
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-
-    log.appendAsLeader(records(0), 0)
-    log.maybeIncrementHighWatermark(new LogOffsetMetadata(2))
-    log.maybeIncrementLogStartOffset(1, 
LogStartOffsetIncrementReason.SegmentDeletion)
-
-    val listener = new MockLogOffsetsListener()
-    listener.verify()
-
-    log.setLogOffsetsListener(listener)
-    listener.verify() // it is still empty because we don't call the listener 
when it is set.
-
-    log.appendAsLeader(records(0), 0)
-    log.maybeIncrementHighWatermark(new LogOffsetMetadata(4))
-    listener.verify(expectedHighWatermark = 4)
-  }
-
-  @ParameterizedTest
-  @EnumSource(value = classOf[AppendOrigin], names = Array("CLIENT", 
"COORDINATOR"))
-  def testTransactionIsOngoingAndVerificationGuardTV2(appendOrigin: 
AppendOrigin): Unit = {
-    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
-
-    val producerId = 23L
-    val producerEpoch = 1.toShort
-    // For TV2, when there's no existing producer state, sequence must be 0 
for both CLIENT and COORDINATOR
-    var sequence = 0
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-    
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL))
-
-    val idempotentRecords = MemoryRecords.withIdempotentRecords(
-      Compression.NONE,
-      producerId,
-      producerEpoch,
-      sequence,
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )
-
-    // Only clients have nonzero sequences
-    if (appendOrigin == AppendOrigin.CLIENT)
-      sequence = sequence + 2
-
-    val transactionalRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE,
-      producerId,
-      producerEpoch,
-      sequence,
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )
-
-    val verificationGuard = log.maybeStartTransactionVerification(producerId, 
sequence, producerEpoch, true)
-    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
-    log.appendAsLeader(idempotentRecords, 0, appendOrigin)
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-
-    // Since we wrote idempotent records, we keep VerificationGuard.
-    assertEquals(verificationGuard, log.verificationGuard(producerId))
-
-    // Now write the transactional records
-    assertTrue(log.verificationGuard(producerId).verify(verificationGuard))
-    log.appendAsLeader(transactionalRecords, 0, appendOrigin, 
RequestLocal.noCaching(), verificationGuard,
-      TransactionVersion.TV_2.featureLevel())
-    assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
-    // VerificationGuard should be cleared now.
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
-    // A subsequent maybeStartTransactionVerification will be empty since we 
are already verified.
-    assertEquals(VerificationGuard.SENTINEL, 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
true))
-
-    // For TV2, the coordinator bumps the epoch before writing the marker 
(KIP-890)
-    val bumpedEpoch = (producerEpoch + 1).toShort
-    val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
-      producerId,
-      bumpedEpoch,
-      new EndTransactionMarker(ControlRecordType.COMMIT, 0)
-    )
-
-    log.appendAsLeader(endTransactionMarkerRecord, 0, AppendOrigin.COORDINATOR,
-      RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_2.featureLevel())
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
-    if (appendOrigin == AppendOrigin.CLIENT)
-      sequence = sequence + 1
-
-    // A new maybeStartTransactionVerification will not be empty, as we need 
to verify the next transaction.
-    // For TV2, after the marker is written with bumped epoch, the producer 
state now has the bumped epoch
-    val newVerificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence, bumpedEpoch, true)
-    assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard)
-    assertNotEquals(verificationGuard, newVerificationGuard)
-    assertFalse(verificationGuard.verify(newVerificationGuard))
-  }
-
-  @ParameterizedTest
-  @EnumSource(value = classOf[AppendOrigin], names = Array("CLIENT", 
"COORDINATOR"))
-  def testTransactionIsOngoingAndVerificationGuardTV1(appendOrigin: 
AppendOrigin): Unit = {
-    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
false)
-
-    val producerId = 23L
-    val producerEpoch = 1.toShort
-    // For TV1, can start with non-zero sequences even with non-zero epoch 
when no existing producer state
-    var sequence = if (appendOrigin == AppendOrigin.CLIENT) 3 else 0
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-    
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL))
-
-    val idempotentRecords = MemoryRecords.withIdempotentRecords(
-      Compression.NONE,
-      producerId,
-      producerEpoch,
-      sequence,
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )
-
-    // Only clients have nonzero sequences
-    if (appendOrigin == AppendOrigin.CLIENT)
-      sequence = sequence + 2
-
-    val transactionalRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE,
-      producerId,
-      producerEpoch,
-      sequence,
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )
-
-    // For TV1, create verification guard with supportsEpochBump=false
-    val verificationGuard = log.maybeStartTransactionVerification(producerId, 
sequence, producerEpoch, false)
-    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
-    log.appendAsLeader(idempotentRecords, 0, appendOrigin)
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-
-    // Since we wrote idempotent records, we keep VerificationGuard.
-    assertEquals(verificationGuard, log.verificationGuard(producerId))
-
-    // Now write the transactional records
-    assertTrue(log.verificationGuard(producerId).verify(verificationGuard))
-    log.appendAsLeader(transactionalRecords, 0, appendOrigin, 
RequestLocal.noCaching(), verificationGuard,
-      TransactionVersion.TV_1.featureLevel())
-    assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
-    // VerificationGuard should be cleared now.
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
-    // A subsequent maybeStartTransactionVerification will be empty since we 
are already verified.
-    assertEquals(VerificationGuard.SENTINEL, 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
false))
-
-    val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
-      producerId,
-      producerEpoch,
-      new EndTransactionMarker(ControlRecordType.COMMIT, 0)
-    )
-
-    log.appendAsLeader(endTransactionMarkerRecord, 0, AppendOrigin.COORDINATOR,
-      RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_1.featureLevel())
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
-    if (appendOrigin == AppendOrigin.CLIENT)
-      sequence = sequence + 1
-
-    // A new maybeStartTransactionVerification will not be empty, as we need 
to verify the next transaction.
-    val newVerificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
false)
-    assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard)
-    assertNotEquals(verificationGuard, newVerificationGuard)
-    assertFalse(verificationGuard.verify(newVerificationGuard))
-  }
-
-  @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testEmptyTransactionStillClearsVerificationGuard(supportsEpochBump: 
Boolean): Unit = {
-    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
-
-    val producerId = 23L
-    val producerEpoch = 1.toShort
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
-
-    val verificationGuard = log.maybeStartTransactionVerification(producerId, 
0, producerEpoch, supportsEpochBump)
-    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
-    val endMarkerProducerEpoch = if (supportsEpochBump) (producerEpoch + 
1).toShort else producerEpoch
-    val transactionVersion = if (supportsEpochBump) 
TransactionVersion.TV_2.featureLevel() else 
TransactionVersion.TV_1.featureLevel()
-    val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
-      producerId,
-      endMarkerProducerEpoch,
-      new EndTransactionMarker(ControlRecordType.COMMIT, 0)
-    )
-
-    log.appendAsLeader(endTransactionMarkerRecord, 0, 
AppendOrigin.COORDINATOR, RequestLocal.noCaching(),
-      VerificationGuard.SENTINEL, transactionVersion)
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-  }
-
-  @Test
-  def testNextTransactionVerificationGuardNotCleared(): Unit = {
-    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
-
-    val producerId = 23L
-    val producerEpoch = 1.toShort
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
-
-    val verificationGuard = log.maybeStartTransactionVerification(producerId, 
0, producerEpoch, true)
-    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
-    // If the producer epoch is the same on the EndTxn marker, the 
verification must be for the next transaction, so we shouldn't clear it.
-    val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
-      producerId,
-      producerEpoch,
-      new EndTransactionMarker(ControlRecordType.COMMIT, 0)
-    )
-
-    log.appendAsLeader(endTransactionMarkerRecord, 0, AppendOrigin.COORDINATOR,
-      RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_0.featureLevel())
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(verificationGuard, log.verificationGuard(producerId))
-  }
-
-  @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testDisabledVerificationClearsVerificationGuard(supportsEpochBump: 
Boolean): Unit = {
-    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
-
-    val producerId = 23L
-    val producerEpoch = 1.toShort
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
-
-    val verificationGuard = log.maybeStartTransactionVerification(producerId, 
0, producerEpoch, supportsEpochBump)
-    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
-    producerStateManagerConfig.setTransactionVerificationEnabled(false)
-
-    val transactionalRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE,
-      producerId,
-      producerEpoch,
-      0,
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )
-    log.appendAsLeader(transactionalRecords, 0)
-
-    assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-  }
-
-  @Test
-  def testEnablingVerificationWhenRequestIsAtLogLayer(): Unit = {
-    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
false)
-
-    val producerId = 23L
-    val producerEpoch = 1.toShort
-    val sequence = 0
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
-
-    producerStateManagerConfig.setTransactionVerificationEnabled(true)
-
-    val transactionalRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE,
-      producerId,
-      producerEpoch,
-      sequence,
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )
-    assertThrows(classOf[InvalidTxnStateException], () => 
log.appendAsLeader(transactionalRecords, 0))
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
-    val verificationGuard = log.maybeStartTransactionVerification(producerId, 
sequence, producerEpoch, true)
-    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
-    log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching,
-      verificationGuard, TransactionVersion.TV_2.featureLevel())
-    assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-  }
-
-  @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def 
testNonZeroSequenceOnFirstAppendNonZeroEpoch(transactionVerificationEnabled: 
Boolean): Unit = {
-    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
transactionVerificationEnabled)
-
-    val producerId = 23L
-    val producerEpoch = 1.toShort
-    val sequence = 3
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
-    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
-    assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-    assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
-    val transactionalRecords = MemoryRecords.withTransactionalRecords(
-      Compression.NONE,
-      producerId,
-      producerEpoch,
-      sequence,
-      new SimpleRecord("1".getBytes),
-      new SimpleRecord("2".getBytes)
-    )
-
-    if (transactionVerificationEnabled) {
-      // TV2 behavior: Create verification state that supports epoch bumps
-      val verificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, true)
-      // Should reject non-zero sequences when there's no existing producer 
state
-      assertThrows(classOf[OutOfOrderSequenceException], () => 
-        log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, verificationGuard,
-          TransactionVersion.TV_0.featureLevel()))
-    } else {
-      // TV1 behavior: Create verification state with supportsEpochBump=false
-      val verificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
false)
-      // Should allow non-zero sequences with non-zero epoch
-      log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching, verificationGuard,
-        TransactionVersion.TV_0.featureLevel())
-      assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
-    }
-  }
-
   @Test
   def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): 
Unit = {
     val logConfig = LogTestUtils.createLogConfig()
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index bcefc6679aa..eb6957f13ac 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -25,12 +25,14 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.InconsistentTopicIdException;
 import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.KafkaStorageException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.AbortedTxn;
 import org.apache.kafka.common.message.DescribeProducersResponseData;
 import org.apache.kafka.common.message.FetchResponseData;
@@ -56,6 +58,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
 import org.apache.kafka.server.common.RequestLocal;
 import org.apache.kafka.server.common.TransactionVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
 import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
 import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
@@ -85,6 +88,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -123,6 +127,7 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -1447,7 +1452,7 @@ public class UnifiedLogTest {
             public MemoryRecords.RecordFilter.BatchRetentionResult 
checkBatchRetention(RecordBatch batch) {
                 return new 
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY,
 false);
             }
-            @Override 
+            @Override
             public boolean shouldRetainRecord(RecordBatch recordBatch, Record 
record) {
                 return !record.hasKey();
             }
@@ -1496,7 +1501,7 @@ public class UnifiedLogTest {
 
         ByteBuffer filtered = ByteBuffer.allocate(2048);
         records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
-            @Override 
+            @Override
             public MemoryRecords.RecordFilter.BatchRetentionResult 
checkBatchRetention(RecordBatch batch) {
                 return new 
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.RETAIN_EMPTY,
 true);
             }
@@ -1679,7 +1684,7 @@ public class UnifiedLogTest {
         log.updateHighWatermark(log.logEndOffset());
         log.maybeIncrementLogStartOffset(1L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
         assertEquals(
-                2, 
+                2,
                 log.deleteOldSegments(),
                 "Expecting two segment deletions as log start offset retention 
should unblock time based retention"
         );
@@ -1750,7 +1755,7 @@ public class UnifiedLogTest {
                 .sorted()
                 .collect(Collectors.toList());
         assertEquals(
-                expectedSnapshotOffsets, 
+                expectedSnapshotOffsets,
                 snapshotOffsets,
                 "expected a snapshot file per segment base offset, except the 
first segment"
         );
@@ -2415,6 +2420,28 @@ public class UnifiedLogTest {
                 producerStateManagerConfig, true, Optional.empty(), false);
     }
 
+    private UnifiedLog createLog(File dir, LogConfig config, 
LogOffsetsListener logOffsetsListener) throws IOException {
+        UnifiedLog log = UnifiedLog.create(
+                dir, config, 0L, 0L, mockTime.scheduler, brokerTopicStats, 
mockTime,
+                3600000, producerStateManagerConfig,
+                
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+                new LogDirFailureChannel(10), true, Optional.empty(), new 
ConcurrentHashMap<>(),
+                false, logOffsetsListener);
+        logsToClose.add(log);
+        return log;
+    }
+
+    private UnifiedLog createLog(File dir, LogConfig config, 
ProducerStateManagerConfig psmConfig) throws IOException {
+        UnifiedLog log = UnifiedLog.create(
+                dir, config, 0L, 0L, mockTime.scheduler, brokerTopicStats, 
mockTime,
+                3600000, psmConfig,
+                
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+                new LogDirFailureChannel(10), true, Optional.empty(), new 
ConcurrentHashMap<>(),
+                false, LogOffsetsListener.NO_OP_OFFSETS_LISTENER);
+        logsToClose.add(log);
+        return log;
+    }
+
     private UnifiedLog createLog(
             File dir,
             LogConfig config,
@@ -2579,7 +2606,7 @@ public class UnifiedLogTest {
                 .build();
         log = createLog(logDir, logConfig, true);
 
-        String metricName = "name=RetentionSizeInPercent,topic=" + 
log.topicPartition().topic() + 
+        String metricName = "name=RetentionSizeInPercent,topic=" + 
log.topicPartition().topic() +
                 ",partition=" + log.topicPartition().partition();
 
         // Append some messages to create 3 segments (15 records / 5 records 
per segment = 3 segments)
@@ -2618,7 +2645,7 @@ public class UnifiedLogTest {
                 .build();
         log = createLog(logDir, logConfig, false);
 
-        String metricName = "name=RetentionSizeInPercent,topic=" + 
log.topicPartition().topic() + 
+        String metricName = "name=RetentionSizeInPercent,topic=" + 
log.topicPartition().topic() +
                 ",partition=" + log.topicPartition().partition();
 
         for (int i = 0; i < 10; i++) {
@@ -2692,7 +2719,7 @@ public class UnifiedLogTest {
         assertEquals(200, yammerMetricValue(metricName),
                 "Metric should be updated in finally block even when exception 
occurs");
     }
-    
+
     @Test
     public void testReadWithMinMessage() throws IOException {
         LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
@@ -2810,7 +2837,7 @@ public class UnifiedLogTest {
                 Compression.NONE,
                 new SimpleRecord(mockTime.milliseconds(), null, 
"Test".getBytes())
         );
-        
+
         log.appendAsLeader(message, 0);
         log.roll();
         assertEquals(2, logDir.listFiles(f -> 
f.getName().endsWith(".log")).length);
@@ -3028,7 +3055,7 @@ public class UnifiedLogTest {
         log.appendAsLeader(first, 0);
 
         assertThrows(
-                RecordTooLargeException.class, 
+                RecordTooLargeException.class,
                 () -> log.appendAsLeader(second, 0),
                 "Second message set should throw MessageSizeTooLargeException."
         );
@@ -3436,7 +3463,7 @@ public class UnifiedLogTest {
         UnifiedLog log = createLog(logDir, logConfig);
 
         // Test initial state before any records
-        assertFetchOffsetBySpecialTimestamp(log, Optional.empty(), 
+        assertFetchOffsetBySpecialTimestamp(log, Optional.empty(),
             new 
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, 
Optional.of(-1)),
             ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
 
@@ -4230,7 +4257,7 @@ public class UnifiedLogTest {
 
     private void prepare(int logStartOffset) throws IOException {
         RemoteLogManagerConfig config = createRemoteLogManagerConfig();
-        DelayedOperationPurgatory<DelayedRemoteListOffsets> purgatory = 
+        DelayedOperationPurgatory<DelayedRemoteListOffsets> purgatory =
             new DelayedOperationPurgatory<>("RemoteListOffsets", 0);
         remoteLogManager = spy(new RemoteLogManager(
             config,
@@ -4279,9 +4306,9 @@ public class UnifiedLogTest {
         return result;
     }
 
-    private void assertFetchOffsetBySpecialTimestamp(UnifiedLog log, 
+    private void assertFetchOffsetBySpecialTimestamp(UnifiedLog log,
                                                       
Optional<RemoteLogManager> remoteLogManagerOpt,
-                                                      
FileRecords.TimestampAndOffset expected, 
+                                                      
FileRecords.TimestampAndOffset expected,
                                                       long timestamp) {
         Optional<AsyncOffsetReader> remoteOffsetReader = 
remoteLogManagerOpt.map(rlm -> rlm);
         OffsetResultHolder offsetResultHolder = 
log.fetchOffsetByTimestamp(timestamp, remoteOffsetReader);
@@ -5010,4 +5037,461 @@ public class UnifiedLogTest {
         }
         builder.close();
     }
+
+    @Test
+    public void testMaybeUpdateHighWatermarkAsFollower() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        for (int i = 0; i < 100; i++) {
+            MemoryRecords records = singletonRecords(("test" + i).getBytes());
+            log.appendAsLeader(records, 0);
+        }
+
+        assertEquals(Optional.of(99L), log.maybeUpdateHighWatermark(99L));
+        assertEquals(Optional.empty(), log.maybeUpdateHighWatermark(99L));
+
+        assertEquals(Optional.of(100L), log.maybeUpdateHighWatermark(100L));
+        assertEquals(Optional.empty(), log.maybeUpdateHighWatermark(100L));
+
+        // bound by the log end offset
+        assertEquals(Optional.empty(), log.maybeUpdateHighWatermark(101L));
+    }
+
+    @Test
+    public void testEnableRemoteLogStorageOnCompactedTopics() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        assertFalse(log.remoteLogEnabled());
+
+        log = createLog(logDir, logConfig, true);
+        assertFalse(log.remoteLogEnabled());
+
+        logConfig = new 
LogTestUtils.LogConfigBuilder().remoteLogStorageEnable(true).build();
+        log = createLog(logDir, logConfig, true);
+        assertTrue(log.remoteLogEnabled());
+
+        logConfig = new LogTestUtils.LogConfigBuilder()
+                .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+                .remoteLogStorageEnable(true)
+                .build();
+        log = createLog(logDir, logConfig, true);
+        assertFalse(log.remoteLogEnabled());
+
+        logConfig = new LogTestUtils.LogConfigBuilder()
+                .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT + "," + 
TopicConfig.CLEANUP_POLICY_DELETE)
+                .remoteLogStorageEnable(true)
+                .build();
+        log = createLog(logDir, logConfig, true);
+        assertFalse(log.remoteLogEnabled());
+    }
+
+    @Test
+    public void 
testRemoteLogStorageIsDisabledOnInternalAndRemoteLogMetadataTopic() throws 
IOException {
+        List<TopicPartition> partitions = List.of(
+                new 
TopicPartition(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME,
 0),
+                new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
+                new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, 0),
+                new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0),
+                new TopicPartition(Topic.CLUSTER_METADATA_TOPIC_NAME, 0)
+        );
+        for (TopicPartition partition : partitions) {
+            LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().remoteLogStorageEnable(true).build();
+            File internalLogDir = new File(TestUtils.tempDirectory(), 
partition.toString());
+            internalLogDir.mkdir();
+            UnifiedLog log = createLog(internalLogDir, logConfig, true);
+            assertFalse(log.remoteLogEnabled());
+        }
+    }
+
+    @Test
+    public void testNoOpWhenRemoteLogStorageIsDisabled() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        for (int i = 0; i < 100; i++) {
+            MemoryRecords records = singletonRecords(("test" + i).getBytes());
+            log.appendAsLeader(records, 0);
+        }
+
+        log.updateHighWatermark(90L);
+        log.maybeIncrementLogStartOffset(20L, 
LogStartOffsetIncrementReason.SegmentDeletion);
+        assertEquals(20, log.logStartOffset());
+    }
+
+    @Test
+    public void testStartOffsetsRemoteLogStorageIsEnabled() throws IOException 
{
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().remoteLogStorageEnable(true).build();
+        UnifiedLog log = createLog(logDir, logConfig, true);
+
+        for (int i = 0; i < 100; i++) {
+            MemoryRecords records = singletonRecords(("test" + i).getBytes());
+            log.appendAsLeader(records, 0);
+        }
+
+        log.updateHighWatermark(80L);
+        long newLogStartOffset = 40L;
+        log.maybeIncrementLogStartOffset(newLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion);
+        assertEquals(newLogStartOffset, log.logStartOffset());
+        assertEquals(log.logStartOffset(), log.localLogStartOffset());
+
+        // Truncate the local log and verify that the offsets are updated to 
expected values
+        long newLocalLogStartOffset = 60L;
+        log.truncateFullyAndStartAt(newLocalLogStartOffset, 
Optional.of(newLogStartOffset));
+        assertEquals(newLogStartOffset, log.logStartOffset());
+        assertEquals(newLocalLogStartOffset, log.localLogStartOffset());
+    }
+
+    @Test
+    public void testLogOffsetsListener() throws IOException {
+        MockLogOffsetsListener listener = new MockLogOffsetsListener();
+        listener.verify();
+
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig, listener);
+
+        listener.verify(0L);
+
+        log.appendAsLeader(records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ), 0L, 0), 0);
+        log.appendAsLeader(records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ), 0L, 0), 0);
+
+        log.maybeIncrementHighWatermark(new LogOffsetMetadata(4));
+        listener.verify(4L);
+
+        log.truncateTo(3);
+        listener.verify(3L);
+
+        log.appendAsLeader(records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ), 0L, 0), 0);
+        log.truncateFullyAndStartAt(4, Optional.empty());
+        listener.verify(4L);
+    }
+
+    private static class MockLogOffsetsListener implements LogOffsetsListener {
+        private long highWatermark = -1L;
+
+        @Override
+        public void onHighWatermarkUpdated(long offset) {
+            highWatermark = offset;
+        }
+
+        /**
+         * Verifies the callbacks that have been triggered since the last
+         * verification. Values different from {@code -1} are the ones that 
have
+         * been updated.
+         */
+        public void verify(long expectedHighWatermark) {
+            assertEquals(expectedHighWatermark, highWatermark, "Unexpected 
high watermark");
+            highWatermark = -1L;
+        }
+
+        public void verify() {
+            verify(-1L);
+        }
+    }
+
+    @Test
+    public void testUpdateLogOffsetsListener() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ), 0L, 0), 0);
+        log.maybeIncrementHighWatermark(new LogOffsetMetadata(2));
+        log.maybeIncrementLogStartOffset(1, 
LogStartOffsetIncrementReason.SegmentDeletion);
+
+        MockLogOffsetsListener listener = new MockLogOffsetsListener();
+        listener.verify();
+
+        log.setLogOffsetsListener(listener);
+        listener.verify(); // it is still empty because we don't call the 
listener when it is set.
+
+        log.appendAsLeader(records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ), 0L, 0), 0);
+        log.maybeIncrementHighWatermark(new LogOffsetMetadata(4));
+        listener.verify(4L);
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = AppendOrigin.class, names = {"CLIENT", "COORDINATOR"})
+    public void testTransactionIsOngoingAndVerificationGuardTV2(AppendOrigin 
appendOrigin) throws IOException {
+        ProducerStateManagerConfig psmConfig = new 
ProducerStateManagerConfig(86400000, true);
+
+        long producerId = 23L;
+        short producerEpoch = 1;
+        // For TV2, when there's no existing producer state, sequence must be 
0 for both CLIENT and COORDINATOR
+        int sequence = 0;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+        
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL));
+
+        MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(
+                Compression.NONE, producerId, producerEpoch, sequence,
+                new SimpleRecord("1".getBytes()), new 
SimpleRecord("2".getBytes()));
+
+        // Only clients have nonzero sequences
+        if (appendOrigin == AppendOrigin.CLIENT)
+            sequence += 2;
+
+        MemoryRecords transactionalRecords = 
MemoryRecords.withTransactionalRecords(
+                Compression.NONE, producerId, producerEpoch, sequence,
+                new SimpleRecord("1".getBytes()), new 
SimpleRecord("2".getBytes()));
+
+        VerificationGuard verificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
true);
+        assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+        log.appendAsLeader(idempotentRecords, 0, appendOrigin);
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+
+        // Since we wrote idempotent records, we keep VerificationGuard.
+        assertEquals(verificationGuard, log.verificationGuard(producerId));
+
+        // Now write the transactional records
+        
assertTrue(log.verificationGuard(producerId).verify(verificationGuard));
+        log.appendAsLeader(transactionalRecords, 0, appendOrigin, 
RequestLocal.noCaching(),
+                verificationGuard, TransactionVersion.TV_2.featureLevel());
+        assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+        // VerificationGuard should be cleared now.
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+
+        // A subsequent maybeStartTransactionVerification will be empty since 
we are already verified.
+        assertEquals(VerificationGuard.SENTINEL, 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
true));
+
+        // For TV2, the coordinator bumps the epoch before writing the marker 
(KIP-890)
+        short bumpedEpoch = (short) (producerEpoch + 1);
+        MemoryRecords endTransactionMarkerRecord = 
MemoryRecords.withEndTransactionMarker(
+                producerId, bumpedEpoch, new 
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
+        log.appendAsLeader(endTransactionMarkerRecord, 0, 
AppendOrigin.COORDINATOR,
+                RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_2.featureLevel());
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+
+        if (appendOrigin == AppendOrigin.CLIENT)
+            sequence += 1;
+
+        // A new maybeStartTransactionVerification will not be empty, as we 
need to verify the next transaction.
+        // For TV2, after the marker is written with bumped epoch, the 
producer state now has the bumped epoch
+        VerificationGuard newVerificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence, bumpedEpoch, true);
+        assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard);
+        assertNotEquals(verificationGuard, newVerificationGuard);
+        assertFalse(verificationGuard.verify(newVerificationGuard));
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = AppendOrigin.class, names = {"CLIENT", "COORDINATOR"})
+    public void testTransactionIsOngoingAndVerificationGuardTV1(AppendOrigin 
appendOrigin) throws IOException {
+        ProducerStateManagerConfig psmConfig = new 
ProducerStateManagerConfig(86400000, false);
+
+        long producerId = 23L;
+        short producerEpoch = 1;
+        // For TV1, can start with non-zero sequences even with non-zero epoch 
when no existing producer state
+        int sequence = appendOrigin == AppendOrigin.CLIENT ? 3 : 0;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+        
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL));
+
+        MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(
+                Compression.NONE, producerId, producerEpoch, sequence,
+                new SimpleRecord("1".getBytes()), new 
SimpleRecord("2".getBytes()));
+
+        // Only clients have nonzero sequences
+        if (appendOrigin == AppendOrigin.CLIENT)
+            sequence += 2;
+
+        MemoryRecords transactionalRecords = 
MemoryRecords.withTransactionalRecords(
+                Compression.NONE, producerId, producerEpoch, sequence,
+                new SimpleRecord("1".getBytes()), new 
SimpleRecord("2".getBytes()));
+
+        // For TV1, create verification guard with supportsEpochBump=false
+        VerificationGuard verificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
false);
+        assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+        log.appendAsLeader(idempotentRecords, 0, appendOrigin);
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+
+        // Since we wrote idempotent records, we keep VerificationGuard.
+        assertEquals(verificationGuard, log.verificationGuard(producerId));
+
+        // Now write the transactional records
+        
assertTrue(log.verificationGuard(producerId).verify(verificationGuard));
+        log.appendAsLeader(transactionalRecords, 0, appendOrigin, 
RequestLocal.noCaching(),
+                verificationGuard, TransactionVersion.TV_1.featureLevel());
+        assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+        // VerificationGuard should be cleared now.
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+
+        // A subsequent maybeStartTransactionVerification will be empty since 
we are already verified.
+        assertEquals(VerificationGuard.SENTINEL, 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
false));
+
+        MemoryRecords endTransactionMarkerRecord = 
MemoryRecords.withEndTransactionMarker(
+                producerId, producerEpoch, new 
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
+        log.appendAsLeader(endTransactionMarkerRecord, 0, 
AppendOrigin.COORDINATOR,
+                RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_1.featureLevel());
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+
+        if (appendOrigin == AppendOrigin.CLIENT)
+            sequence += 1;
+
+        // A new maybeStartTransactionVerification will not be empty, as we 
need to verify the next transaction.
+        VerificationGuard newVerificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
false);
+        assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard);
+        assertNotEquals(verificationGuard, newVerificationGuard);
+        assertFalse(verificationGuard.verify(newVerificationGuard));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testEmptyTransactionStillClearsVerificationGuard(boolean 
supportsEpochBump) throws IOException {
+        ProducerStateManagerConfig psmConfig = new 
ProducerStateManagerConfig(86400000, true);
+
+        long producerId = 23L;
+        short producerEpoch = 1;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+        VerificationGuard verificationGuard = 
log.maybeStartTransactionVerification(producerId, 0, producerEpoch, 
supportsEpochBump);
+        assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+        short endMarkerProducerEpoch = supportsEpochBump ? (short) 
(producerEpoch + 1) : producerEpoch;
+        short transactionVersion = supportsEpochBump ? 
TransactionVersion.TV_2.featureLevel() : TransactionVersion.TV_1.featureLevel();
+        MemoryRecords endTransactionMarkerRecord = 
MemoryRecords.withEndTransactionMarker(
+                producerId, endMarkerProducerEpoch, new 
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
+        log.appendAsLeader(endTransactionMarkerRecord, 0, 
AppendOrigin.COORDINATOR,
+                RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
transactionVersion);
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+    }
+
+    @Test
+    public void testNextTransactionVerificationGuardNotCleared() throws 
IOException {
+        ProducerStateManagerConfig psmConfig = new 
ProducerStateManagerConfig(86400000, true);
+
+        long producerId = 23L;
+        short producerEpoch = 1;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+        VerificationGuard verificationGuard = 
log.maybeStartTransactionVerification(producerId, 0, producerEpoch, true);
+        assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+        // If the producer epoch is the same on the EndTxn marker, the 
verification must be for the next transaction, so we shouldn't clear it.
+        MemoryRecords endTransactionMarkerRecord = 
MemoryRecords.withEndTransactionMarker(
+                producerId, producerEpoch, new 
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
+        log.appendAsLeader(endTransactionMarkerRecord, 0, 
AppendOrigin.COORDINATOR,
+                RequestLocal.noCaching(), VerificationGuard.SENTINEL, 
TransactionVersion.TV_0.featureLevel());
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(verificationGuard, log.verificationGuard(producerId));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDisabledVerificationClearsVerificationGuard(boolean 
supportsEpochBump) throws IOException {
+        ProducerStateManagerConfig psmConfig = new 
ProducerStateManagerConfig(86400000, true);
+
+        long producerId = 23L;
+        short producerEpoch = 1;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+        VerificationGuard verificationGuard = 
log.maybeStartTransactionVerification(producerId, 0, producerEpoch, 
supportsEpochBump);
+        assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+        psmConfig.setTransactionVerificationEnabled(false);
+
+        MemoryRecords transactionalRecords = 
MemoryRecords.withTransactionalRecords(
+                Compression.NONE, producerId, producerEpoch, 0,
+                new SimpleRecord("1".getBytes()), new 
SimpleRecord("2".getBytes()));
+        log.appendAsLeader(transactionalRecords, 0);
+
+        assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+    }
+
+    @Test
+    public void testEnablingVerificationWhenRequestIsAtLogLayer() throws 
IOException {
+        ProducerStateManagerConfig psmConfig = new 
ProducerStateManagerConfig(86400000, false);
+
+        long producerId = 23L;
+        short producerEpoch = 1;
+        int sequence = 0;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+        psmConfig.setTransactionVerificationEnabled(true);
+
+        MemoryRecords transactionalRecords = 
MemoryRecords.withTransactionalRecords(
+                Compression.NONE, producerId, producerEpoch, sequence,
+                new SimpleRecord("1".getBytes()), new 
SimpleRecord("2".getBytes()));
+        assertThrows(InvalidTxnStateException.class, () -> 
log.appendAsLeader(transactionalRecords, 0));
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+
+        VerificationGuard verificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, 
true);
+        assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+        log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(),
+                verificationGuard, TransactionVersion.TV_2.featureLevel());
+        assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testNonZeroSequenceOnFirstAppendNonZeroEpoch(boolean 
transactionVerificationEnabled) throws IOException {
+        ProducerStateManagerConfig psmConfig = new 
ProducerStateManagerConfig(86400000, transactionVerificationEnabled);
+
+        long producerId = 23L;
+        short producerEpoch = 1;
+        int sequence = 3;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+        assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+        assertEquals(VerificationGuard.SENTINEL, 
log.verificationGuard(producerId));
+
+        MemoryRecords transactionalRecords = 
MemoryRecords.withTransactionalRecords(
+                Compression.NONE, producerId, producerEpoch, sequence,
+                new SimpleRecord("1".getBytes()), new 
SimpleRecord("2".getBytes()));
+
+        VerificationGuard verificationGuard = 
log.maybeStartTransactionVerification(producerId, sequence,
+                producerEpoch, transactionVerificationEnabled);
+        if (transactionVerificationEnabled) {
+            // TV2 behavior: Create verification state that supports epoch 
bumps
+            // Should reject non-zero sequences when there's no existing 
producer state
+            assertThrows(OutOfOrderSequenceException.class, () ->
+                    log.appendAsLeader(transactionalRecords, 0, 
AppendOrigin.CLIENT, RequestLocal.noCaching(),
+                            verificationGuard, 
TransactionVersion.TV_0.featureLevel()));
+        } else {
+            // TV1 behavior: Create verification state with 
supportsEpochBump=false
+            // Should allow non-zero sequences with non-zero epoch
+            log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, 
RequestLocal.noCaching(),
+                    verificationGuard, TransactionVersion.TV_0.featureLevel());
+            assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+        }
+    }
 }

Reply via email to