This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c42359579b KAFKA-19752: Move UnifiedLogTest to storage module (#21844)
3c42359579b is described below
commit 3c42359579bba930d254a69812ac6b04b179628d
Author: TaiJuWu <[email protected]>
AuthorDate: Thu Mar 26 21:29:44 2026 +0800
KAFKA-19752: Move UnifiedLogTest to storage module (#21844)
testTransactionIndexUpdatedThroughReplication ~
testRenamingDirWithoutReinitialization
Reviewers: Christo Lolov <[email protected]>
---
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 723 +--------------------
.../kafka/storage/internals/log/LogTestUtils.java | 9 +
.../storage/internals/log/UnifiedLogTest.java | 667 +++++++++++++++++++
3 files changed, 678 insertions(+), 721 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 18c1f29002f..12be8551252 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -24,22 +24,18 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.internal._
-import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.{MockTime, Scheduler}
-import org.apache.kafka.common.message.AbortedTxn
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener,
LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder,
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, OffsetResultHolder, ProducerStateManagerConfig,
UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
+import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
@@ -47,7 +43,6 @@ import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doThrow, spy}
import java.io._
-import java.nio.ByteBuffer
import java.nio.file.Files
import java.util
import java.util.concurrent.ConcurrentHashMap
@@ -85,679 +80,6 @@ class UnifiedLogTest {
}
}
-
- @Test
- def testTransactionIndexUpdatedThroughReplication(): Unit = {
- val epoch = 0.toShort
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
- val buffer = ByteBuffer.allocate(2048)
-
- val pid1 = 1L
- val pid2 = 2L
- val pid3 = 3L
- val pid4 = 4L
-
- val appendPid1 = appendTransactionalToBuffer(buffer, pid1, epoch)
- val appendPid2 = appendTransactionalToBuffer(buffer, pid2, epoch)
- val appendPid3 = appendTransactionalToBuffer(buffer, pid3, epoch)
- val appendPid4 = appendTransactionalToBuffer(buffer, pid4, epoch)
-
- appendPid1(0L, 5)
- appendNonTransactionalToBuffer(buffer, 5L, 3)
- appendPid2(8L, 2)
- appendPid1(10L, 4)
- appendPid3(14L, 3)
- appendNonTransactionalToBuffer(buffer, 17L, 2)
- appendPid1(19L, 10)
- appendEndTxnMarkerToBuffer(buffer, pid1, epoch, 29L,
ControlRecordType.ABORT)
- appendPid2(30L, 6)
- appendPid4(36L, 3)
- appendNonTransactionalToBuffer(buffer, 39L, 10)
- appendPid3(49L, 9)
- appendEndTxnMarkerToBuffer(buffer, pid3, epoch, 58L,
ControlRecordType.COMMIT)
- appendPid4(59L, 8)
- appendPid2(67L, 7)
- appendEndTxnMarkerToBuffer(buffer, pid2, epoch, 74L,
ControlRecordType.ABORT)
- appendNonTransactionalToBuffer(buffer, 75L, 10)
- appendPid4(85L, 4)
- appendEndTxnMarkerToBuffer(buffer, pid4, epoch, 89L,
ControlRecordType.COMMIT)
-
- buffer.flip()
-
- appendAsFollower(log, MemoryRecords.readableRecords(buffer), epoch)
-
- val abortedTransactions = LogTestUtils.allAbortedTransactions(log)
- val expectedTransactions = List(
- new
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
- new
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)
- )
-
- assertEquals(expectedTransactions, abortedTransactions)
-
- // Verify caching of the segment position of the first unstable offset
- log.updateHighWatermark(30L)
- assertCachedFirstUnstableOffset(log, expectedOffset = 8L)
-
- log.updateHighWatermark(75L)
- assertCachedFirstUnstableOffset(log, expectedOffset = 36L)
-
- log.updateHighWatermark(log.logEndOffset)
- assertEquals(Optional.empty, log.firstUnstableOffset)
- }
-
- private def assertCachedFirstUnstableOffset(log: UnifiedLog, expectedOffset:
Long): Unit = {
- assertTrue(log.producerStateManager.firstUnstableOffset.isPresent)
- val firstUnstableOffset = log.producerStateManager.firstUnstableOffset.get
- assertEquals(expectedOffset, firstUnstableOffset.messageOffset)
- assertFalse(firstUnstableOffset.messageOffsetOnly)
- assertValidLogOffsetMetadata(log, firstUnstableOffset)
- }
-
- private def assertValidLogOffsetMetadata(log: UnifiedLog, offsetMetadata:
LogOffsetMetadata): Unit = {
- assertFalse(offsetMetadata.messageOffsetOnly)
-
- val segmentBaseOffset = offsetMetadata.segmentBaseOffset
- val segments = log.logSegments(segmentBaseOffset, segmentBaseOffset + 1)
- assertFalse(segments.isEmpty)
-
- val segment = segments.iterator().next()
- assertEquals(segmentBaseOffset, segment.baseOffset)
- assertTrue(offsetMetadata.relativePositionInSegment <= segment.size)
-
- val readInfo = segment.read(offsetMetadata.messageOffset,
- 2048,
- Optional.of(segment.size),
- false)
-
- if (offsetMetadata.relativePositionInSegment < segment.size)
- assertEquals(offsetMetadata, readInfo.fetchOffsetMetadata)
- else
- assertNull(readInfo)
- }
-
- @Test
- def testZombieCoordinatorFenced(): Unit = {
- val pid = 1L
- val epoch = 0.toShort
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch,
mockTime)
-
- append(10)
- LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
- coordinatorEpoch = 1, transactionVersion =
TransactionVersion.TV_0.featureLevel())
-
- append(5)
- LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.COMMIT, mockTime.milliseconds(),
- coordinatorEpoch = 2, transactionVersion =
TransactionVersion.TV_0.featureLevel())
-
- assertThrows(
- classOf[TransactionCoordinatorFencedException],
- () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
- coordinatorEpoch = 1, transactionVersion =
TransactionVersion.TV_0.featureLevel()))
- }
-
- @Test
- def testZombieCoordinatorFencedEmptyTransaction(): Unit = {
- val pid = 1L
- val epoch = 0.toShort
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- val buffer = ByteBuffer.allocate(256)
- val append = appendTransactionalToBuffer(buffer, pid, epoch, 1)
- append(0, 10)
- appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L,
ControlRecordType.COMMIT, 1)
-
- buffer.flip()
- log.appendAsFollower(MemoryRecords.readableRecords(buffer), epoch)
-
- LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
- coordinatorEpoch = 2, leaderEpoch = 1, transactionVersion =
TransactionVersion.TV_0.featureLevel())
- LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
- coordinatorEpoch = 2, leaderEpoch = 1, transactionVersion =
TransactionVersion.TV_0.featureLevel())
- assertThrows(classOf[TransactionCoordinatorFencedException],
- () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
- coordinatorEpoch = 1, leaderEpoch = 1, transactionVersion =
TransactionVersion.TV_0.featureLevel()))
- }
-
- @ParameterizedTest(name = "testEndTxnWithFencedProducerEpoch with
transactionVersion={0}")
- @ValueSource(shorts = Array(1, 2))
- def testEndTxnWithFencedProducerEpoch(transactionVersion: Short): Unit = {
- val producerId = 1L
- val epoch = 5.toShort
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- // First, write some transactional records to establish the current epoch
- val records = MemoryRecords.withTransactionalRecords(
- Compression.NONE, producerId, epoch, 0,
- new SimpleRecord("key".getBytes, "value".getBytes)
- )
- log.appendAsLeader(records, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion)
-
- // Test 1: Old epoch (epoch - 1) should be rejected for both TV0/TV1 and
TV2
- // TV0/TV1: markerEpoch < currentEpoch is rejected
- // TV2: markerEpoch <= currentEpoch is rejected (requires strict >)
- assertThrows(classOf[InvalidProducerEpochException],
- () => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, (epoch -
1).toShort,
- ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch =
1,
- leaderEpoch = 0, transactionVersion = transactionVersion))
-
- // Test 2: Same epoch behavior differs between TV0/TV1 and TV2
- // TV0/TV1: same epoch is allowed (markerEpoch >= currentEpoch)
- // TV2: same epoch is rejected (requires strict >, markerEpoch >
currentEpoch)
- if (transactionVersion >= 2) {
- // TV2: same epoch should be rejected
- assertThrows(classOf[InvalidProducerEpochException],
- () => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch,
- ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch =
1,
- leaderEpoch = 0, transactionVersion = transactionVersion))
- } else {
- // TV0/TV1: same epoch should be allowed
- assertDoesNotThrow(() => LogTestUtils.appendEndTxnMarkerAsLeader(log,
producerId, epoch,
- ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch =
1,
- leaderEpoch = 0, transactionVersion = transactionVersion))
- }
- }
-
- @Test
- def testTV2MarkerWithBumpedEpochSucceeds(): Unit = {
- // Test that TV2 markers with bumped epochs (epoch + 1) are accepted
(positive case)
- // TV2 (KIP-890): Coordinator bumps epoch before writing marker, so
markerEpoch = currentEpoch + 1
- val transactionVersion: Short = 2
- val producerId = 1L
- val epoch = 5.toShort
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- // First, write some transactional records to establish the current epoch
- val records = MemoryRecords.withTransactionalRecords(
- Compression.NONE, producerId, epoch, 0,
- new SimpleRecord("key".getBytes, "value".getBytes)
- )
- log.appendAsLeader(records, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion)
-
- // TV2: Verify that bumped epoch (epoch + 1) is accepted
- val bumpedEpoch = (epoch + 1).toShort
- assertDoesNotThrow(() => LogTestUtils.appendEndTxnMarkerAsLeader(log,
producerId, bumpedEpoch,
- ControlRecordType.COMMIT, mockTime.milliseconds(), coordinatorEpoch = 1,
- leaderEpoch = 0, transactionVersion =
TransactionVersion.TV_2.featureLevel()))
-
- // Verify the marker was successfully appended by checking producer state
- val producerState =
log.producerStateManager.activeProducers.get(producerId)
- assertNotNull(producerState)
- // After a commit marker, the producer epoch should be updated to the
bumped epoch for TV2
- assertEquals(bumpedEpoch, producerState.producerEpoch)
- }
-
- @Test
- def testReplicationWithTVUnknownAllowed(): Unit = {
- // Test that TV_UNKNOWN is allowed for replication (REPLICATION origin)
and uses TV_0 validation
- // This simulates the scenario where:
- // 1. Leader receives WriteTxnMarkersRequest with transactionVersion=2 and
validates with strict TV2 rules
- // 2. Leader writes MemoryRecords to log (transactionVersion is not stored
in MemoryRecords)
- // 3. Follower receives MemoryRecords via replication (without
transactionVersion metadata)
- // 4. Follower uses TV_UNKNOWN which defaults to TV_0 validation (more
permissive, safe because leader already validated)
-
- val producerId = 1L
- val epoch = 5.toShort
- val coordinatorEpoch = 1
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- // Step 1: Write transactional records as leader to establish current epoch
- val transactionalRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE, producerId, epoch, 0,
- new SimpleRecord("key".getBytes, "value".getBytes)
- )
- log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_2.featureLevel())
-
- // Step 2: Simulate leader writing TV2 marker with bumped epoch (epoch + 1)
- // This is what happens at the leader when WriteTxnMarkersRequest is
received
- val bumpedEpoch = (epoch + 1).toShort
- val leaderMarker = MemoryRecords.withEndTransactionMarker(
- mockTime.milliseconds(),
- producerId,
- bumpedEpoch,
- new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
- )
- // Leader validates with TV2 (strict: markerEpoch > currentEpoch)
- log.appendAsLeader(leaderMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_2.featureLevel())
-
- // Verify leader state
- val leaderProducerState =
log.producerStateManager.activeProducers.get(producerId)
- assertNotNull(leaderProducerState)
- assertEquals(bumpedEpoch, leaderProducerState.producerEpoch)
-
- // Step 3: Create a new log to simulate a follower
- val followerLogDir = TestUtils.randomPartitionLogDir(tmpDir)
- val followerLog = createLog(followerLogDir, logConfig)
-
- // Step 4: Follower replicates transactional records first
- val followerTransactionalRecords = MemoryRecords.withTransactionalRecords(
- 0L,
- Compression.NONE, producerId, epoch, 0,
- 0,
- new SimpleRecord("key".getBytes, "value".getBytes)
- )
- followerLog.appendAsFollower(followerTransactionalRecords, 0)
-
- // Step 5: Follower replicates the marker (appendAsFollower uses
TV_UNKNOWN internally)
- // This should succeed because TV_UNKNOWN is allowed for REPLICATION origin
- // and defaults to TV_0 validation (markerEpoch >= currentEpoch), which is
more permissive
- // The marker should be at offset 1 (after the transactional record at
offset 0)
- val followerMarker = MemoryRecords.withEndTransactionMarker(
- 1L, // offset after the transactional record
- mockTime.milliseconds(),
- 0, // partition leader epoch
- producerId,
- bumpedEpoch,
- new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
- )
-
- // This should not throw an exception - TV_UNKNOWN is allowed for
replication
- assertDoesNotThrow(() => followerLog.appendAsFollower(followerMarker, 0))
-
- // Verify follower state matches leader state
- val followerProducerState =
followerLog.producerStateManager.activeProducers.get(producerId)
- assertNotNull(followerProducerState)
- assertEquals(bumpedEpoch, followerProducerState.producerEpoch)
- assertEquals(coordinatorEpoch, followerProducerState.coordinatorEpoch)
-
- // Verify the marker was written to the follower log
- assertEquals(2L, followerLog.logEndOffset) // 1 transactional record + 1
marker
- }
-
- @Test
- def testLeaderRejectsTVUnknownForTransactionMarker(): Unit = {
- // Test that TV_UNKNOWN is rejected for COORDINATOR origin (leader writing
transaction markers)
- // TV_UNKNOWN is only allowed for REPLICATION origin (followers)
- val producerId = 1L
- val epoch = 5.toShort
- val coordinatorEpoch = 1
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- // Write transactional records as leader to establish current epoch
- val transactionalRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE, producerId, epoch, 0,
- new SimpleRecord("key".getBytes, "value".getBytes)
- )
- log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_2.featureLevel())
-
- // Attempt to write a transaction marker with TV_UNKNOWN as COORDINATOR
(leader)
- // This should throw IllegalArgumentException because TV_UNKNOWN is not
allowed for COORDINATOR origin
- val marker = MemoryRecords.withEndTransactionMarker(
- mockTime.milliseconds(),
- producerId,
- (epoch + 1).toShort, // bumped epoch for TV2
- new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch)
- )
-
- val exception = assertThrows(classOf[IllegalArgumentException], () => {
- log.appendAsLeader(marker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_UNKNOWN)
- })
-
- assertTrue(exception.getMessage.contains("transactionVersion must be
explicitly specified"))
- assertTrue(exception.getMessage.contains("TV_UNKNOWN"))
- assertTrue(exception.getMessage.contains("COORDINATOR"))
- }
-
- @Test
- def testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
- val epoch = 0.toShort
- val pid = 1L
- val appendPid = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch,
mockTime)
-
- appendPid(5)
- LogTestUtils.appendNonTransactionalAsLeader(log, 3)
- assertEquals(8L, log.logEndOffset)
-
- log.roll()
- assertEquals(2, log.logSegments.size)
- appendPid(5)
-
- assertEquals(Optional.of(0L), log.firstUnstableOffset)
-
- log.updateHighWatermark(log.logEndOffset)
- log.maybeIncrementLogStartOffset(5L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
- // the first unstable offset should be lower bounded by the log start
offset
- assertEquals(Optional.of(5L), log.firstUnstableOffset)
- }
-
- @Test
- def testLastStableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion():
Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
- val epoch = 0.toShort
- val pid = 1L
- val appendPid = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch,
mockTime)
-
- appendPid(5)
- LogTestUtils.appendNonTransactionalAsLeader(log, 3)
- assertEquals(8L, log.logEndOffset)
-
- log.roll()
- assertEquals(2, log.logSegments.size)
- appendPid(5)
-
- assertEquals(Optional.of(0L), log.firstUnstableOffset)
-
- log.updateHighWatermark(log.logEndOffset)
- log.maybeIncrementLogStartOffset(8L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.updateHighWatermark(log.logEndOffset)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertEquals(1, log.logSegments.size)
-
- // the first unstable offset should be lower bounded by the log start
offset
- assertEquals(Optional.of(8L), log.firstUnstableOffset)
- }
-
- @Test
- def testAppendToTransactionIndexFailure(): Unit = {
- val pid = 1L
- val epoch = 0.toShort
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch,
mockTime)
- append(10)
-
- // Kind of a hack, but renaming the index to a directory ensures that the
append
- // to the index will fail.
- log.activeSegment.txnIndex.renameTo(log.dir)
-
- // The append will be written to the log successfully, but the write to
the index will fail
- assertThrows(
- classOf[KafkaStorageException],
- () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
- coordinatorEpoch = 1, transactionVersion =
TransactionVersion.TV_0.featureLevel()))
- assertEquals(11L, log.logEndOffset)
- assertEquals(0L, log.lastStableOffset)
-
- // Try the append a second time. The appended offset in the log should not
increase
- // because the log dir is marked as failed. Nor will there be a write to
the transaction
- // index.
- assertThrows(
- classOf[KafkaStorageException],
- () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
- coordinatorEpoch = 1, transactionVersion =
TransactionVersion.TV_0.featureLevel()))
- assertEquals(11L, log.logEndOffset)
- assertEquals(0L, log.lastStableOffset)
-
- // Even if the high watermark is updated, the first unstable offset does
not move
- log.updateHighWatermark(12L)
- assertEquals(0L, log.lastStableOffset)
-
- assertThrows(classOf[KafkaStorageException], () => log.close())
- val reopenedLog = createLog(logDir, logConfig, lastShutdownClean = false)
- assertEquals(11L, reopenedLog.logEndOffset)
- assertEquals(1, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
- reopenedLog.updateHighWatermark(12L)
- assertEquals(Optional.empty, reopenedLog.firstUnstableOffset)
- }
-
- @Test
- def testOffsetSnapshot(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- // append a few records
- appendAsFollower(
- log,
- MemoryRecords.withRecords(
- Compression.NONE,
- new SimpleRecord("a".getBytes),
- new SimpleRecord("b".getBytes),
- new SimpleRecord("c".getBytes)
- ),
- 5
- )
-
-
- log.updateHighWatermark(3L)
- var offsets: LogOffsetSnapshot = log.fetchOffsetSnapshot
- assertEquals(offsets.highWatermark.messageOffset, 3L)
- assertFalse(offsets.highWatermark.messageOffsetOnly)
-
- offsets = log.fetchOffsetSnapshot
- assertEquals(offsets.highWatermark.messageOffset, 3L)
- assertFalse(offsets.highWatermark.messageOffsetOnly)
- }
-
- @Test
- def testLastStableOffsetWithMixedProducerData(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- // for convenience, both producers share the same epoch
- val epoch = 5.toShort
-
- val pid1 = 137L
- val seq1 = 0
- val pid2 = 983L
- val seq2 = 0
-
- // add some transactional records
- val firstAppendInfo =
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
pid1, epoch, seq1,
- new SimpleRecord("a".getBytes),
- new SimpleRecord("b".getBytes),
- new SimpleRecord("c".getBytes)), 0)
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
-
- // mix in some non-transactional data
- log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
- new SimpleRecord("g".getBytes),
- new SimpleRecord("h".getBytes),
- new SimpleRecord("i".getBytes)), 0)
-
- // append data from a second transactional producer
- val secondAppendInfo =
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
pid2, epoch, seq2,
- new SimpleRecord("d".getBytes),
- new SimpleRecord("e".getBytes),
- new SimpleRecord("f".getBytes)), 0)
-
- // LSO should not have changed
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
-
- // now first producer's transaction is aborted
- val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1,
epoch, ControlRecordType.ABORT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel())
- log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
-
- // LSO should now point to one less than the first offset of the second
transaction
- assertEquals(Optional.of(secondAppendInfo.firstOffset),
log.firstUnstableOffset)
-
- // commit the second transaction
- val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2,
epoch, ControlRecordType.COMMIT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel())
- log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
-
- // now there should be no first unstable offset
- assertEquals(Optional.empty, log.firstUnstableOffset)
- }
-
- @Test
- def testAbortedTransactionSpanningMultipleSegments(): Unit = {
- val pid = 137L
- val epoch = 5.toShort
- var seq = 0
-
- val records = MemoryRecords.withTransactionalRecords(Compression.NONE,
pid, epoch, seq,
- new SimpleRecord("a".getBytes),
- new SimpleRecord("b".getBytes),
- new SimpleRecord("c".getBytes))
-
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
records.sizeInBytes)
- val log = createLog(logDir, logConfig)
-
- val firstAppendInfo = log.appendAsLeader(records, 0)
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
-
- // this write should spill to the second segment
- seq = 3
-
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
pid, epoch, seq,
- new SimpleRecord("d".getBytes),
- new SimpleRecord("e".getBytes),
- new SimpleRecord("f".getBytes)), 0)
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
- assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
-
- // now abort the transaction
- val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid,
epoch, ControlRecordType.ABORT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel())
- log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
- assertEquals(Optional.empty, log.firstUnstableOffset)
-
- // now check that a fetch includes the aborted transaction
- val fetchDataInfo = log.read(0L, 2048, FetchIsolation.TXN_COMMITTED, true)
-
- assertTrue(fetchDataInfo.abortedTransactions.isPresent)
- assertEquals(1, fetchDataInfo.abortedTransactions.get.size)
- assertEquals(new
FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0),
fetchDataInfo.abortedTransactions.get.get(0))
- }
-
- @Test
- def testLoadPartitionDirWithNoSegmentsShouldNotThrow(): Unit = {
- val dirName = UnifiedLog.logDeleteDirName(new TopicPartition("foo", 3))
- val logDir = new File(tmpDir, dirName)
- logDir.mkdirs()
- val logConfig = LogTestUtils.createLogConfig()
- val log = createLog(logDir, logConfig)
- assertEquals(1, log.numberOfSegments)
- }
-
- @Test
- def testSegmentDeletionWithHighWatermarkInitialization(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(
- segmentBytes = 512,
- segmentIndexBytes = 1000,
- retentionMs = 999
- )
- val log = createLog(logDir, logConfig)
-
- val expiredTimestamp = mockTime.milliseconds() - 1000
- for (i <- 0 until 100) {
- val records = TestUtils.singletonRecords(value = s"test$i".getBytes,
timestamp = expiredTimestamp)
- log.appendAsLeader(records, 0)
- }
-
- val initialHighWatermark = log.updateHighWatermark(25L)
- assertEquals(25L, initialHighWatermark)
-
- val initialNumSegments = log.numberOfSegments
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- assertTrue(log.numberOfSegments < initialNumSegments)
- assertTrue(log.logStartOffset <= initialHighWatermark)
- }
-
- @Test
- def testCannotDeleteSegmentsAtOrAboveHighWatermark(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(
- segmentBytes = 512,
- segmentIndexBytes = 1000,
- retentionMs = 999
- )
- val log = createLog(logDir, logConfig)
-
- val expiredTimestamp = mockTime.milliseconds() - 1000
- for (i <- 0 until 100) {
- val records = TestUtils.singletonRecords(value = s"test$i".getBytes,
timestamp = expiredTimestamp)
- log.appendAsLeader(records, 0)
- }
-
- // ensure we have at least a few segments so the test case is not trivial
- assertTrue(log.numberOfSegments > 5)
- assertEquals(0L, log.highWatermark)
- assertEquals(0L, log.logStartOffset)
- assertEquals(100L, log.logEndOffset)
-
- for (hw <- 0 to 100) {
- log.updateHighWatermark(hw)
- assertEquals(hw, log.highWatermark)
- log.deleteOldSegments()
- assertTrue(log.logStartOffset <= hw)
-
- // verify that all segments up to the high watermark have been deleted
- log.logSegments.asScala.headOption.foreach { segment =>
- assertTrue(segment.baseOffset <= hw)
- assertTrue(segment.baseOffset >= log.logStartOffset)
- }
- log.logSegments.asScala.tail.foreach { segment =>
- assertTrue(segment.baseOffset > hw)
- assertTrue(segment.baseOffset >= log.logStartOffset)
- }
- }
-
- assertEquals(100L, log.logStartOffset)
- assertEquals(1, log.numberOfSegments)
- assertEquals(0, log.activeSegment.size)
- }
-
- @Test
- def testCannotIncrementLogStartOffsetPastHighWatermark(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(
- segmentBytes = 512,
- segmentIndexBytes = 1000,
- retentionMs = 999
- )
- val log = createLog(logDir, logConfig)
-
- for (i <- 0 until 100) {
- val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
- log.appendAsLeader(records, 0)
- }
-
- log.updateHighWatermark(25L)
- assertThrows(classOf[OffsetOutOfRangeException], () =>
log.maybeIncrementLogStartOffset(26L,
LogStartOffsetIncrementReason.ClientRecordDeletion))
- }
-
- @Test
- def testBackgroundDeletionWithIOException(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
- assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
-
- // Delete the underlying directory to trigger a KafkaStorageException
- val dir = log.dir
- Utils.delete(dir)
- Files.createFile(dir.toPath)
-
- assertThrows(classOf[KafkaStorageException], () => {
- log.delete()
- })
- assertTrue(log.logDirFailureChannel.hasOfflineLogDir(tmpDir.toString))
- }
-
- /**
- * test renaming a log's dir without reinitialization, which is the case
during topic deletion
- */
- @Test
- def testRenamingDirWithoutReinitialization(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
- assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
-
- val newDir = TestUtils.randomPartitionLogDir(tmpDir)
- assertTrue(newDir.exists())
-
- log.renameDir(newDir.getName, false)
- assertFalse(log.leaderEpochCache.nonEmpty)
- assertTrue(log.partitionMetadataFile.isEmpty)
- assertEquals(0, log.logEndOffset)
-
- // verify that the background deletion can succeed
- log.delete()
- assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
- assertFalse(newDir.exists())
- }
-
@Test
def testMaybeUpdateHighWatermarkAsFollower(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
@@ -1650,47 +972,6 @@ class UnifiedLogTest {
assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()),
result)
}
- private def appendTransactionalToBuffer(buffer: ByteBuffer,
- producerId: Long,
- producerEpoch: Short,
- leaderEpoch: Int = 0): (Long, Int)
=> Unit = {
- var sequence = 0
- (offset: Long, numRecords: Int) => {
- val builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.CREATE_TIME,
- offset, mockTime.milliseconds(), producerId, producerEpoch, sequence,
true, leaderEpoch)
- for (seq <- sequence until sequence + numRecords) {
- val record = new SimpleRecord(s"$seq".getBytes)
- builder.append(record)
- }
-
- sequence += numRecords
- builder.close()
- }
- }
-
- private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer,
- producerId: Long,
- producerEpoch: Short,
- offset: Long,
- controlType: ControlRecordType,
- coordinatorEpoch: Int = 0,
- leaderEpoch: Int = 0): Unit = {
- val marker = new EndTransactionMarker(controlType, coordinatorEpoch)
- MemoryRecords.writeEndTransactionalMarker(buffer, offset,
mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker)
- }
-
- private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long,
numRecords: Int): Unit = {
- val builder = MemoryRecords.builder(buffer, Compression.NONE,
TimestampType.CREATE_TIME, offset)
- (0 until numRecords).foreach { seq =>
- builder.append(new SimpleRecord(s"$seq".getBytes))
- }
- builder.close()
- }
-
- private def appendAsFollower(log: UnifiedLog, records: MemoryRecords,
leaderEpoch: Int): Unit = {
- records.batches.forEach(_.setPartitionLeaderEpoch(leaderEpoch))
- log.appendAsFollower(records, leaderEpoch)
- }
private def createLog(dir: File,
config: LogConfig,
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index e9efb9e5af7..67f8cb61e50 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.internal.ControlRecordType;
import org.apache.kafka.common.record.internal.DefaultRecordBatch;
@@ -204,6 +205,14 @@ public class LogTestUtils {
return records(records, magicValue, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
}
+ public static List<AbortedTxn> allAbortedTransactions(UnifiedLog log) {
+ List<AbortedTxn> result = new ArrayList<>();
+ for (LogSegment segment : log.logSegments()) {
+ result.addAll(segment.txnIndex().allAbortedTxns());
+ }
+ return result;
+ }
+
public static void deleteProducerSnapshotFiles(File logDir) {
Stream.of(logDir.listFiles())
.filter(f -> f.isFile() &&
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX))
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index f382fe803a3..bcefc6679aa 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -30,13 +30,16 @@ import
org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.message.DescribeProducersResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.internal.CompressionType;
import org.apache.kafka.common.record.internal.ControlRecordType;
import org.apache.kafka.common.record.internal.DefaultRecordBatch;
+import org.apache.kafka.common.record.internal.EndTransactionMarker;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.record.internal.InvalidMemoryRecordsProvider;
import org.apache.kafka.common.record.internal.MemoryRecords;
@@ -51,6 +54,7 @@ import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.common.RequestLocal;
import org.apache.kafka.server.common.TransactionVersion;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
@@ -4343,4 +4347,667 @@ public class UnifiedLogTest {
.getValue();
return gauge.value();
}
+
+ @Test
+ public void testTransactionIndexUpdatedThroughReplication() throws
IOException {
+ short epoch = 0;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+ long pid1 = 1L;
+ long pid2 = 2L;
+ long pid3 = 3L;
+ long pid4 = 4L;
+
+ BiConsumer<Long, Integer> appendPid1 =
appendTransactionalToBuffer(buffer, pid1, epoch);
+ BiConsumer<Long, Integer> appendPid2 =
appendTransactionalToBuffer(buffer, pid2, epoch);
+ BiConsumer<Long, Integer> appendPid3 =
appendTransactionalToBuffer(buffer, pid3, epoch);
+ BiConsumer<Long, Integer> appendPid4 =
appendTransactionalToBuffer(buffer, pid4, epoch);
+
+ appendPid1.accept(0L, 5);
+ appendNonTransactionalToBuffer(buffer, 5L, 3);
+ appendPid2.accept(8L, 2);
+ appendPid1.accept(10L, 4);
+ appendPid3.accept(14L, 3);
+ appendNonTransactionalToBuffer(buffer, 17L, 2);
+ appendPid1.accept(19L, 10);
+ appendEndTxnMarkerToBuffer(buffer, pid1, epoch, 29L,
ControlRecordType.ABORT);
+ appendPid2.accept(30L, 6);
+ appendPid4.accept(36L, 3);
+ appendNonTransactionalToBuffer(buffer, 39L, 10);
+ appendPid3.accept(49L, 9);
+ appendEndTxnMarkerToBuffer(buffer, pid3, epoch, 58L,
ControlRecordType.COMMIT);
+ appendPid4.accept(59L, 8);
+ appendPid2.accept(67L, 7);
+ appendEndTxnMarkerToBuffer(buffer, pid2, epoch, 74L,
ControlRecordType.ABORT);
+ appendNonTransactionalToBuffer(buffer, 75L, 10);
+ appendPid4.accept(85L, 4);
+ appendEndTxnMarkerToBuffer(buffer, pid4, epoch, 89L,
ControlRecordType.COMMIT);
+
+ buffer.flip();
+
+ appendAsFollower(log, MemoryRecords.readableRecords(buffer), epoch);
+
+ List<AbortedTxn> abortedTransactions =
LogTestUtils.allAbortedTransactions(log);
+ List<AbortedTxn> expectedTransactions = List.of(
+ new
AbortedTxn().setProducerId(pid1).setFirstOffset(0L).setLastOffset(29L).setLastStableOffset(8L),
+ new
AbortedTxn().setProducerId(pid2).setFirstOffset(8L).setLastOffset(74L).setLastStableOffset(36L)
+ );
+
+ assertEquals(expectedTransactions, abortedTransactions);
+
+ // Verify caching of the segment position of the first unstable offset
+ log.updateHighWatermark(30L);
+ assertCachedFirstUnstableOffset(log, 8L);
+
+ log.updateHighWatermark(75L);
+ assertCachedFirstUnstableOffset(log, 36L);
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(Optional.empty(), log.firstUnstableOffset());
+ }
+
+ @Test
+ public void testZombieCoordinatorFenced() throws IOException {
+ long pid = 1L;
+ short epoch = 0;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+
+ Consumer<Integer> append =
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+
+ append.accept(10);
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 1, 0, TransactionVersion.TV_0.featureLevel());
+
+ append.accept(5);
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.COMMIT, mockTime.milliseconds(),
+ 2, 0, TransactionVersion.TV_0.featureLevel());
+
+ assertThrows(TransactionCoordinatorFencedException.class,
+ () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 1, 0, TransactionVersion.TV_0.featureLevel()));
+ }
+
+ @Test
+ public void testZombieCoordinatorFencedEmptyTransaction() throws
IOException {
+ long pid = 1L;
+ short epoch = 0;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ BiConsumer<Long, Integer> append = appendTransactionalToBuffer(buffer,
pid, epoch, 1);
+ append.accept(0L, 10);
+ appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L,
ControlRecordType.COMMIT, 1);
+
+ buffer.flip();
+ log.appendAsFollower(MemoryRecords.readableRecords(buffer), epoch);
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 2, 1, TransactionVersion.TV_0.featureLevel());
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 2, 1, TransactionVersion.TV_0.featureLevel());
+ assertThrows(TransactionCoordinatorFencedException.class,
+ () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 1, 1, TransactionVersion.TV_0.featureLevel()));
+ }
+
+ @ParameterizedTest(name = "testEndTxnWithFencedProducerEpoch with
transactionVersion={0}")
+ @ValueSource(shorts = {0, 1, 2})
+ public void testEndTxnWithFencedProducerEpoch(short transactionVersion)
throws IOException {
+ long producerId = 1L;
+ short epoch = 5;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+
+ // First, write some transactional records to establish the current
epoch
+ MemoryRecords records = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, epoch, 0,
+ new SimpleRecord("key".getBytes(), "value".getBytes())
+ );
+ log.appendAsLeader(records, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion);
+
+ // Test 1: Old epoch (epoch - 1) should be rejected for both TV0/TV1
and TV2
+ // TV0/TV1: markerEpoch < currentEpoch is rejected
+ // TV2: markerEpoch <= currentEpoch is rejected (requires strict >)
+ assertThrows(InvalidProducerEpochException.class,
+ () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId,
(short) (epoch - 1),
+ ControlRecordType.ABORT, mockTime.milliseconds(), 1, 0,
transactionVersion));
+
+ // Test 2: Same epoch behavior differs between TV0/TV1 and TV2
+ // TV0/TV1: same epoch is allowed (markerEpoch >= currentEpoch)
+ // TV2: same epoch is rejected (requires strict >, markerEpoch >
currentEpoch)
+ if (transactionVersion >= 2) {
+ // TV2: same epoch should be rejected
+ assertThrows(InvalidProducerEpochException.class,
+ () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId,
epoch,
+ ControlRecordType.ABORT, mockTime.milliseconds(), 1, 0,
transactionVersion));
+ } else {
+ // TV0/TV1: same epoch should be allowed
+ assertDoesNotThrow(() ->
LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch,
+ ControlRecordType.ABORT, mockTime.milliseconds(), 1, 0,
transactionVersion));
+ }
+ }
+
+ @Test
+ public void testTV2MarkerWithBumpedEpochSucceeds() throws IOException {
+ // Test that TV2 markers with bumped epochs (epoch + 1) are accepted
(positive case)
+ // TV2 (KIP-890): Coordinator bumps epoch before writing marker, so
markerEpoch = currentEpoch + 1
+ short transactionVersion = 2;
+ long producerId = 1L;
+ short epoch = 5;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+
+ // First, write some transactional records to establish the current
epoch
+ MemoryRecords records = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, epoch, 0,
+ new SimpleRecord("key".getBytes(), "value".getBytes())
+ );
+ log.appendAsLeader(records, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion);
+
+ // TV2: Verify that bumped epoch (epoch + 1) is accepted
+ short bumpedEpoch = (short) (epoch + 1);
+ assertDoesNotThrow(() -> LogTestUtils.appendEndTxnMarkerAsLeader(log,
producerId, bumpedEpoch,
+ ControlRecordType.COMMIT, mockTime.milliseconds(), 1,
+ 0, TransactionVersion.TV_2.featureLevel()));
+
+ // Verify the marker was successfully appended by checking producer
state
+ ProducerStateEntry producerState =
log.producerStateManager().activeProducers().get(producerId);
+ assertNotNull(producerState);
+ // After a commit marker, the producer epoch should be updated to the
bumped epoch for TV2
+ assertEquals(bumpedEpoch, producerState.producerEpoch());
+ }
+
+ @Test
+ public void testReplicationWithTVUnknownAllowed() throws IOException {
+ // Test that TV_UNKNOWN is allowed for replication (REPLICATION
origin) and uses TV_0 validation
+ // This simulates the scenario where:
+ // 1. Leader receives WriteTxnMarkersRequest with transactionVersion=2
and validates with strict TV2 rules
+ // 2. Leader writes MemoryRecords to log (transactionVersion is not
stored in MemoryRecords)
+ // 3. Follower receives MemoryRecords via replication (without
transactionVersion metadata)
+ // 4. Follower uses TV_UNKNOWN which defaults to TV_0 validation (more
permissive, safe because leader already validated)
+ long producerId = 1L;
+ short epoch = 5;
+ int coordinatorEpoch = 1;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+
+ // Step 1: Write transactional records as leader to establish current
epoch
+ MemoryRecords transactionalRecords =
MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, epoch, 0,
+ new SimpleRecord("key".getBytes(), "value".getBytes())
+ );
+ log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_2.featureLevel());
+
+ // Step 2: Simulate leader writing TV2 marker with bumped epoch (epoch
+ 1)
+ // This is what happens at the leader when WriteTxnMarkersRequest is
received
+ short bumpedEpoch = (short) (epoch + 1);
+ MemoryRecords leaderMarker = MemoryRecords.withEndTransactionMarker(
+ mockTime.milliseconds(),
+ producerId,
+ bumpedEpoch,
+ new EndTransactionMarker(ControlRecordType.COMMIT,
coordinatorEpoch)
+ );
+ // Leader validates with TV2 (strict: markerEpoch > currentEpoch)
+ log.appendAsLeader(leaderMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_2.featureLevel());
+
+ // Verify leader state
+ ProducerStateEntry leaderProducerState =
log.producerStateManager().activeProducers().get(producerId);
+ assertNotNull(leaderProducerState);
+ assertEquals(bumpedEpoch, leaderProducerState.producerEpoch());
+
+ // Step 3: Create a new log to simulate a follower
+ File followerLogDir = TestUtils.randomPartitionLogDir(tmpDir);
+ UnifiedLog followerLog = createLog(followerLogDir, logConfig);
+
+ // Step 4: Follower replicates transactional records first
+ MemoryRecords followerTransactionalRecords =
MemoryRecords.withTransactionalRecords(
+ 0L, Compression.NONE, producerId, epoch, 0, 0,
+ new SimpleRecord("key".getBytes(), "value".getBytes())
+ );
+ followerLog.appendAsFollower(followerTransactionalRecords, 0);
+
+ // Step 5: Follower replicates the marker (appendAsFollower uses
TV_UNKNOWN internally)
+ // This should succeed because TV_UNKNOWN is allowed for REPLICATION
origin
+ // and defaults to TV_0 validation (markerEpoch >= currentEpoch),
which is more permissive
+ // The marker should be at offset 1 (after the transactional record at
offset 0)
+ MemoryRecords followerMarker = MemoryRecords.withEndTransactionMarker(
+ 1L, // offset after the transactional record
+ mockTime.milliseconds(),
+ 0, // partition leader epoch
+ producerId,
+ bumpedEpoch,
+ new EndTransactionMarker(ControlRecordType.COMMIT,
coordinatorEpoch)
+ );
+
+ // This should not throw an exception - TV_UNKNOWN is allowed for
replication
+ assertDoesNotThrow(() -> followerLog.appendAsFollower(followerMarker,
0));
+
+ // Verify follower state matches leader state
+ ProducerStateEntry followerProducerState =
followerLog.producerStateManager().activeProducers().get(producerId);
+ assertNotNull(followerProducerState);
+ assertEquals(bumpedEpoch, followerProducerState.producerEpoch());
+ assertEquals(coordinatorEpoch,
followerProducerState.coordinatorEpoch());
+
+ // Verify the marker was written to the follower log
+ assertEquals(2L, followerLog.logEndOffset()); // 1 transactional
record + 1 marker
+ }
+
+ @Test
+ public void testLeaderRejectsTVUnknownForTransactionMarker() throws
IOException {
+ // Test that TV_UNKNOWN is rejected for COORDINATOR origin (leader
writing transaction markers)
+ // TV_UNKNOWN is only allowed for REPLICATION origin (followers)
+ long producerId = 1L;
+ short epoch = 5;
+ int coordinatorEpoch = 1;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+
+ // Write transactional records as leader to establish current epoch
+ MemoryRecords transactionalRecords =
MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, epoch, 0,
+ new SimpleRecord("key".getBytes(), "value".getBytes())
+ );
+ log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_2.featureLevel());
+
+ // Attempt to write a transaction marker with TV_UNKNOWN as
COORDINATOR (leader)
+ // This should throw IllegalArgumentException because TV_UNKNOWN is
not allowed for COORDINATOR origin
+ MemoryRecords marker = MemoryRecords.withEndTransactionMarker(
+ mockTime.milliseconds(),
+ producerId,
+ (short) (epoch + 1), // bumped epoch for TV2
+ new EndTransactionMarker(ControlRecordType.COMMIT,
coordinatorEpoch)
+ );
+
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () ->
+ log.appendAsLeader(marker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_UNKNOWN));
+
+ assertTrue(exception.getMessage().contains("transactionVersion must be
explicitly specified"));
+ assertTrue(exception.getMessage().contains("TV_UNKNOWN"));
+ assertTrue(exception.getMessage().contains("COORDINATOR"));
+ }
+
+ @Test
+ public void testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment()
throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+ short epoch = 0;
+ long pid = 1L;
+ Consumer<Integer> appendPid =
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+
+ appendPid.accept(5);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 3);
+ assertEquals(8L, log.logEndOffset());
+
+ log.roll();
+ assertEquals(2, log.logSegments().size());
+ appendPid.accept(5);
+
+ assertEquals(Optional.of(0L), log.firstUnstableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.maybeIncrementLogStartOffset(5L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+ // the first unstable offset should be lower bounded by the log start
offset
+ assertEquals(Optional.of(5L), log.firstUnstableOffset());
+ }
+
+ @Test
+ public void
testLastStableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+ short epoch = 0;
+ long pid = 1L;
+ Consumer<Integer> appendPid =
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+
+ appendPid.accept(5);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 3);
+ assertEquals(8L, log.logEndOffset());
+
+ log.roll();
+ assertEquals(2, log.logSegments().size());
+ appendPid.accept(5);
+
+ assertEquals(Optional.of(0L), log.firstUnstableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.maybeIncrementLogStartOffset(8L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertEquals(1, log.logSegments().size());
+
+ // the first unstable offset should be lower bounded by the log start
offset
+ assertEquals(Optional.of(8L), log.firstUnstableOffset());
+ }
+
+ @Test
+ public void testAppendToTransactionIndexFailure() throws IOException {
+ long pid = 1L;
+ short epoch = 0;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+
+ Consumer<Integer> append =
LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime);
+ append.accept(10);
+
+ // Kind of a hack, but renaming the index to a directory ensures that
the append
+ // to the index will fail.
+ log.activeSegment().txnIndex().renameTo(log.dir());
+
+ // The append will be written to the log successfully, but the write
to the index will fail
+ assertThrows(KafkaStorageException.class,
+ () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 1, 0, TransactionVersion.TV_0.featureLevel()));
+ assertEquals(11L, log.logEndOffset());
+ assertEquals(0L, log.lastStableOffset());
+
+ // Try the append a second time. The appended offset in the log should
not increase
+ // because the log dir is marked as failed. Nor will there be a write
to the transaction
+ // index.
+ assertThrows(KafkaStorageException.class,
+ () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 1, 0, TransactionVersion.TV_0.featureLevel()));
+ assertEquals(11L, log.logEndOffset());
+ assertEquals(0L, log.lastStableOffset());
+
+ // Even if the high watermark is updated, the first unstable offset
does not move
+ log.updateHighWatermark(12L);
+ assertEquals(0L, log.lastStableOffset());
+
+ assertThrows(KafkaStorageException.class, () -> log.close());
+ UnifiedLog reopenedLog = createLog(logDir, logConfig, 0L, 0L,
brokerTopicStats, mockTime.scheduler, mockTime,
+ producerStateManagerConfig, false, Optional.empty(), false);
+ assertEquals(11L, reopenedLog.logEndOffset());
+ assertEquals(1,
reopenedLog.activeSegment().txnIndex().allAbortedTxns().size());
+ reopenedLog.updateHighWatermark(12L);
+ assertEquals(Optional.empty(), reopenedLog.firstUnstableOffset());
+ }
+
+ @Test
+ public void testOffsetSnapshot() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+
+ // append a few records
+ appendAsFollower(
+ log,
+ MemoryRecords.withRecords(
+ Compression.NONE,
+ new SimpleRecord("a".getBytes()),
+ new SimpleRecord("b".getBytes()),
+ new SimpleRecord("c".getBytes())
+ ),
+ 5
+ );
+
+ log.updateHighWatermark(3L);
+ LogOffsetSnapshot offsets = log.fetchOffsetSnapshot();
+ assertEquals(3L, offsets.highWatermark().messageOffset);
+ assertFalse(offsets.highWatermark().messageOffsetOnly());
+
+ offsets = log.fetchOffsetSnapshot();
+ assertEquals(3L, offsets.highWatermark().messageOffset);
+ assertFalse(offsets.highWatermark().messageOffsetOnly());
+ }
+
+ @Test
+ public void testLastStableOffsetWithMixedProducerData() throws IOException
{
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ log = createLog(logDir, logConfig);
+
+ // for convenience, both producers share the same epoch
+ short epoch = 5;
+
+ long pid1 = 137L;
+ int seq1 = 0;
+ long pid2 = 983L;
+ int seq2 = 0;
+
+ // add some transactional records
+ LogAppendInfo firstAppendInfo =
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
pid1, epoch, seq1,
+ new SimpleRecord("a".getBytes()),
+ new SimpleRecord("b".getBytes()),
+ new SimpleRecord("c".getBytes())), 0);
+ assertEquals(Optional.of(firstAppendInfo.firstOffset()),
log.firstUnstableOffset());
+
+ // mix in some non-transactional data
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord("g".getBytes()),
+ new SimpleRecord("h".getBytes()),
+ new SimpleRecord("i".getBytes())), 0);
+
+ // append data from a second transactional producer
+ LogAppendInfo secondAppendInfo =
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
pid2, epoch, seq2,
+ new SimpleRecord("d".getBytes()),
+ new SimpleRecord("e".getBytes()),
+ new SimpleRecord("f".getBytes())), 0);
+
+ // LSO should not have changed
+ assertEquals(Optional.of(firstAppendInfo.firstOffset()),
log.firstUnstableOffset());
+
+ // now first producer's transaction is aborted
+ LogAppendInfo abortAppendInfo =
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, epoch,
ControlRecordType.ABORT,
+ mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel());
+ log.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+
+ // LSO should now point to one less than the first offset of the
second transaction
+ assertEquals(Optional.of(secondAppendInfo.firstOffset()),
log.firstUnstableOffset());
+
+ // commit the second transaction
+ LogAppendInfo commitAppendInfo =
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, epoch,
ControlRecordType.COMMIT,
+ mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel());
+ log.updateHighWatermark(commitAppendInfo.lastOffset() + 1);
+
+ // now there should be no first unstable offset
+ assertEquals(Optional.empty(), log.firstUnstableOffset());
+ }
+
+ @Test
+ public void testAbortedTransactionSpanningMultipleSegments() throws
IOException {
+ long pid = 137L;
+ short epoch = 5;
+ int seq = 0;
+
+ MemoryRecords records =
MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq,
+ new SimpleRecord("a".getBytes()),
+ new SimpleRecord("b".getBytes()),
+ new SimpleRecord("c".getBytes()));
+
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(records.sizeInBytes()).build();
+ log = createLog(logDir, logConfig);
+
+ LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+ assertEquals(Optional.of(firstAppendInfo.firstOffset()),
log.firstUnstableOffset());
+
+ // this write should spill to the second segment
+
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
pid, epoch, 3,
+ new SimpleRecord("d".getBytes()),
+ new SimpleRecord("e".getBytes()),
+ new SimpleRecord("f".getBytes())), 0);
+ assertEquals(Optional.of(firstAppendInfo.firstOffset()),
log.firstUnstableOffset());
+ assertEquals(3L, log.logEndOffsetMetadata().segmentBaseOffset);
+
+ // now abort the transaction
+ LogAppendInfo abortAppendInfo =
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
ControlRecordType.ABORT,
+ mockTime.milliseconds(), 0, 0,
TransactionVersion.TV_0.featureLevel());
+ log.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+ assertEquals(Optional.empty(), log.firstUnstableOffset());
+
+ // now check that a fetch includes the aborted transaction
+ FetchDataInfo fetchDataInfo = log.read(0L, 2048,
FetchIsolation.TXN_COMMITTED, true);
+
+ assertTrue(fetchDataInfo.abortedTransactions.isPresent());
+ assertEquals(1, fetchDataInfo.abortedTransactions.get().size());
+ assertEquals(new
FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0),
fetchDataInfo.abortedTransactions.get().get(0));
+ }
+
+ @Test
+ public void testLoadPartitionDirWithNoSegmentsShouldNotThrow() throws
IOException {
+ String dirName = UnifiedLog.logDeleteDirName(new TopicPartition("foo",
3));
+ File testLogDir = new File(tmpDir, dirName);
+ testLogDir.mkdirs();
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+ UnifiedLog testLog = createLog(testLogDir, logConfig);
+ assertEquals(1, testLog.numberOfSegments());
+ }
+
+ @Test
+ public void testSegmentDeletionWithHighWatermarkInitialization() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(512).segmentIndexBytes(1000).retentionMs(999).build();
+ log = createLog(logDir, logConfig);
+
+ long expiredTimestamp = mockTime.milliseconds() - 1000;
+ for (int i = 0; i < 100; i++) {
+ MemoryRecords records = LogTestUtils.singletonRecords(("test" +
i).getBytes(), Compression.NONE, null, expiredTimestamp);
+ log.appendAsLeader(records, 0);
+ }
+
+ long initialHighWatermark = log.updateHighWatermark(25L);
+ assertEquals(25L, initialHighWatermark);
+
+ int initialNumSegments = log.numberOfSegments();
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ assertTrue(log.numberOfSegments() < initialNumSegments);
+ assertTrue(log.logStartOffset() <= initialHighWatermark);
+ }
+
+ @Test
+ public void testCannotDeleteSegmentsAtOrAboveHighWatermark() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(512).segmentIndexBytes(1000).retentionMs(999).build();
+ log = createLog(logDir, logConfig);
+
+ long expiredTimestamp = mockTime.milliseconds() - 1000;
+ for (int i = 0; i < 100; i++) {
+ MemoryRecords records = LogTestUtils.singletonRecords(("test" +
i).getBytes(), Compression.NONE, null, expiredTimestamp);
+ log.appendAsLeader(records, 0);
+ }
+
+ // ensure we have at least a few segments so the test case is not
trivial
+ assertTrue(log.numberOfSegments() > 5);
+ assertEquals(0L, log.highWatermark());
+ assertEquals(0L, log.logStartOffset());
+ assertEquals(100L, log.logEndOffset());
+
+ for (int hw = 0; hw <= 100; hw++) {
+ log.updateHighWatermark(hw);
+ assertEquals(hw, log.highWatermark());
+ log.deleteOldSegments();
+ assertTrue(log.logStartOffset() <= hw);
+
+ // verify that all segments up to the high watermark have been
deleted
+ List<LogSegment> segments = log.logSegments();
+ if (!segments.isEmpty()) {
+ assertTrue(segments.get(0).baseOffset() <= hw);
+ assertTrue(segments.get(0).baseOffset() >=
log.logStartOffset());
+ }
+ for (int i = 1; i < segments.size(); i++) {
+ assertTrue(segments.get(i).baseOffset() > hw);
+ assertTrue(segments.get(i).baseOffset() >=
log.logStartOffset());
+ }
+ }
+
+ assertEquals(100L, log.logStartOffset());
+ assertEquals(1, log.numberOfSegments());
+ assertEquals(0, log.activeSegment().size());
+ }
+
+ @Test
+ public void testCannotIncrementLogStartOffsetPastHighWatermark() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(512).segmentIndexBytes(1000).build();
+ log = createLog(logDir, logConfig);
+
+ for (int i = 0; i < 100; i++) {
+ MemoryRecords records = LogTestUtils.singletonRecords(("test" +
i).getBytes(), null);
+ log.appendAsLeader(records, 0);
+ }
+
+ log.updateHighWatermark(25L);
+ assertThrows(OffsetOutOfRangeException.class,
+ () -> log.maybeIncrementLogStartOffset(26L,
LogStartOffsetIncrementReason.ClientRecordDeletion));
+ }
+
+ @Test
+ public void testBackgroundDeletionWithIOException() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024).build();
+ log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "The number of segments should
be 1");
+
+ // Delete the underlying directory to trigger a KafkaStorageException
+ File dir = log.dir();
+ Utils.delete(dir);
+ Files.createFile(dir.toPath());
+
+ assertThrows(KafkaStorageException.class, () -> log.delete());
+
assertTrue(log.logDirFailureChannel().hasOfflineLogDir(tmpDir.toString()));
+ }
+
+ /**
+ * test renaming a log's dir without reinitialization, which is the case
during topic deletion
+ */
+ @Test
+ public void testRenamingDirWithoutReinitialization() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024).build();
+ log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "The number of segments should
be 1");
+
+ File newDir = TestUtils.randomPartitionLogDir(tmpDir);
+ assertTrue(newDir.exists());
+
+ log.renameDir(newDir.getName(), false);
+ assertFalse(log.leaderEpochCache().nonEmpty());
+ assertTrue(log.partitionMetadataFile().isEmpty());
+ assertEquals(0, log.logEndOffset());
+
+ // verify that the background deletion can succeed
+ log.delete();
+ assertEquals(0, log.numberOfSegments(), "The number of segments should
be 0");
+ assertFalse(newDir.exists());
+ }
+
+ private BiConsumer<Long, Integer> appendTransactionalToBuffer(ByteBuffer
buffer, long producerId, short producerEpoch) {
+ return appendTransactionalToBuffer(buffer, producerId, producerEpoch,
0);
+ }
+
+ private BiConsumer<Long, Integer> appendTransactionalToBuffer(ByteBuffer
buffer, long producerId, short producerEpoch, int leaderEpoch) {
+ int[] sequence = {0};
+ return (offset, numRecords) -> {
+ int baseSequence = sequence[0];
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.CREATE_TIME,
+ offset, mockTime.milliseconds(), producerId, producerEpoch,
baseSequence, true, leaderEpoch);
+ for (int seq = baseSequence; seq < baseSequence + numRecords;
seq++) {
+ builder.append(new
SimpleRecord(String.valueOf(seq).getBytes()));
+ }
+ sequence[0] += numRecords;
+ builder.close();
+ };
+ }
+
+ private void appendEndTxnMarkerToBuffer(ByteBuffer buffer, long
producerId, short producerEpoch,
+ long offset, ControlRecordType
controlType) {
+ appendEndTxnMarkerToBuffer(buffer, producerId, producerEpoch, offset,
controlType, 0, 0);
+ }
+
+ private void appendEndTxnMarkerToBuffer(ByteBuffer buffer, long
producerId, short producerEpoch,
+ long offset, ControlRecordType
controlType,
+ int coordinatorEpoch) {
+ appendEndTxnMarkerToBuffer(buffer, producerId, producerEpoch, offset,
controlType, coordinatorEpoch, 0);
+ }
+
+ private void appendEndTxnMarkerToBuffer(ByteBuffer buffer, long
producerId, short producerEpoch,
+ long offset, ControlRecordType
controlType,
+ int coordinatorEpoch, int
leaderEpoch) {
+ EndTransactionMarker marker = new EndTransactionMarker(controlType,
coordinatorEpoch);
+ MemoryRecords.writeEndTransactionalMarker(buffer, offset,
mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker);
+ }
+
+ private void appendNonTransactionalToBuffer(ByteBuffer buffer, long
offset, int numRecords) {
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
Compression.NONE, TimestampType.CREATE_TIME, offset);
+ for (int seq = 0; seq < numRecords; seq++) {
+ builder.append(new SimpleRecord(String.valueOf(seq).getBytes()));
+ }
+ builder.close();
+ }
}