This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 07c87a8322a KAFKA-19752 Move parts of UnifiedLogTest to storage module
(#21807)
07c87a8322a is described below
commit 07c87a8322ab85ba474a35acc1458e6701f45f00
Author: Ken Huang <[email protected]>
AuthorDate: Tue Mar 31 17:02:24 2026 +0800
KAFKA-19752 Move parts of UnifiedLogTest to storage module (#21807)
testMaybeUpdateHighWatermarkAsFollower ~
testNonZeroSequenceOnFirstAppendNonZeroEpoch
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 497 +-------------------
.../storage/internals/log/UnifiedLogTest.java | 510 ++++++++++++++++++++-
2 files changed, 499 insertions(+), 508 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 12be8551252..69c3e3efb69 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -20,25 +20,19 @@ package kafka.log
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors._
-import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.internal._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{MockTime, Scheduler}
-
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, OffsetResultHolder, ProducerStateManagerConfig,
UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig,
LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments,
OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doThrow, spy}
@@ -80,493 +74,6 @@ class UnifiedLogTest {
}
}
- @Test
- def testMaybeUpdateHighWatermarkAsFollower(): Unit = {
- val logConfig = LogTestUtils.createLogConfig()
- val log = createLog(logDir, logConfig)
-
- for (i <- 0 until 100) {
- val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
- log.appendAsLeader(records, 0)
- }
-
- assertEquals(Optional.of(99L), log.maybeUpdateHighWatermark(99L))
- assertEquals(Optional.empty, log.maybeUpdateHighWatermark(99L))
-
- assertEquals(Optional.of(100L), log.maybeUpdateHighWatermark(100L))
- assertEquals(Optional.empty, log.maybeUpdateHighWatermark(100L))
-
- // bound by the log end offset
- assertEquals(Optional.empty, log.maybeUpdateHighWatermark(101L))
- }
-
- @Test
- def testEnableRemoteLogStorageOnCompactedTopics(): Unit = {
- var logConfig = LogTestUtils.createLogConfig()
- var log = createLog(logDir, logConfig)
- assertFalse(log.remoteLogEnabled())
-
- log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
- assertFalse(log.remoteLogEnabled())
-
- logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
- log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
- assertTrue(log.remoteLogEnabled())
-
- logConfig = LogTestUtils.createLogConfig(cleanupPolicy =
TopicConfig.CLEANUP_POLICY_COMPACT, remoteLogStorageEnable = true)
- log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
- assertFalse(log.remoteLogEnabled())
-
- logConfig = LogTestUtils.createLogConfig(cleanupPolicy =
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE,
- remoteLogStorageEnable = true)
- log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
- assertFalse(log.remoteLogEnabled())
- }
-
- @Test
- def testRemoteLogStorageIsDisabledOnInternalAndRemoteLogMetadataTopic():
Unit = {
- val partitions =
Seq(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME,
- Topic.TRANSACTION_STATE_TOPIC_NAME, Topic.TRANSACTION_STATE_TOPIC_NAME)
- .map(topic => new TopicPartition(topic, 0))
- for (partition <- partitions) {
- val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable =
true)
- val internalLogDir = new File(TestUtils.tempDir(), partition.toString)
- internalLogDir.mkdir()
- val log = createLog(internalLogDir, logConfig,
remoteStorageSystemEnable = true)
- assertFalse(log.remoteLogEnabled())
- }
- }
-
- @Test
- def testNoOpWhenRemoteLogStorageIsDisabled(): Unit = {
- val logConfig = LogTestUtils.createLogConfig()
- val log = createLog(logDir, logConfig)
-
- for (i <- 0 until 100) {
- val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
- log.appendAsLeader(records, 0)
- }
-
- log.updateHighWatermark(90L)
- log.maybeIncrementLogStartOffset(20L,
LogStartOffsetIncrementReason.SegmentDeletion)
- assertEquals(20, log.logStartOffset)
- }
-
- @Test
- def testStartOffsetsRemoteLogStorageIsEnabled(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
- for (i <- 0 until 100) {
- val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
- log.appendAsLeader(records, 0)
- }
-
- log.updateHighWatermark(80L)
- val newLogStartOffset = 40L
- log.maybeIncrementLogStartOffset(newLogStartOffset,
LogStartOffsetIncrementReason.SegmentDeletion)
- assertEquals(newLogStartOffset, log.logStartOffset)
- assertEquals(log.logStartOffset, log.localLogStartOffset())
-
- // Truncate the local log and verify that the offsets are updated to
expected values
- val newLocalLogStartOffset = 60L
- log.truncateFullyAndStartAt(newLocalLogStartOffset,
Optional.of(newLogStartOffset))
- assertEquals(newLogStartOffset, log.logStartOffset)
- assertEquals(newLocalLogStartOffset, log.localLogStartOffset())
- }
-
- private class MockLogOffsetsListener extends LogOffsetsListener {
- private var highWatermark: Long = -1L
-
- override def onHighWatermarkUpdated(offset: Long): Unit = {
- highWatermark = offset
- }
-
- private def clear(): Unit = {
- highWatermark = -1L
- }
-
- /**
- * Verifies the callbacks that have been triggered since the last
- * verification. Values different than `-1` are the ones that have
- * been updated.
- */
- def verify(expectedHighWatermark: Long = -1L): Unit = {
- assertEquals(expectedHighWatermark, highWatermark,
- "Unexpected high watermark")
- clear()
- }
- }
-
- @Test
- def testLogOffsetsListener(): Unit = {
- def records(offset: Long): MemoryRecords = TestUtils.records(List(
- new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
- ), baseOffset = offset, partitionLeaderEpoch = 0)
-
- val listener = new MockLogOffsetsListener()
- listener.verify()
-
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig, logOffsetsListener = listener)
-
- listener.verify(expectedHighWatermark = 0)
-
- log.appendAsLeader(records(0), 0)
- log.appendAsLeader(records(0), 0)
-
- log.maybeIncrementHighWatermark(new LogOffsetMetadata(4))
- listener.verify(expectedHighWatermark = 4)
-
- log.truncateTo(3)
- listener.verify(expectedHighWatermark = 3)
-
- log.appendAsLeader(records(0), 0)
- log.truncateFullyAndStartAt(4, Optional.empty)
- listener.verify(expectedHighWatermark = 4)
- }
-
- @Test
- def testUpdateLogOffsetsListener(): Unit = {
- def records(offset: Long): MemoryRecords = TestUtils.records(List(
- new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "b".getBytes, "value".getBytes),
- new SimpleRecord(mockTime.milliseconds, "c".getBytes, "value".getBytes)
- ), baseOffset = offset, partitionLeaderEpoch = 0)
-
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
- val log = createLog(logDir, logConfig)
-
- log.appendAsLeader(records(0), 0)
- log.maybeIncrementHighWatermark(new LogOffsetMetadata(2))
- log.maybeIncrementLogStartOffset(1,
LogStartOffsetIncrementReason.SegmentDeletion)
-
- val listener = new MockLogOffsetsListener()
- listener.verify()
-
- log.setLogOffsetsListener(listener)
- listener.verify() // it is still empty because we don't call the listener
when it is set.
-
- log.appendAsLeader(records(0), 0)
- log.maybeIncrementHighWatermark(new LogOffsetMetadata(4))
- listener.verify(expectedHighWatermark = 4)
- }
-
- @ParameterizedTest
- @EnumSource(value = classOf[AppendOrigin], names = Array("CLIENT",
"COORDINATOR"))
- def testTransactionIsOngoingAndVerificationGuardTV2(appendOrigin:
AppendOrigin): Unit = {
- val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
-
- val producerId = 23L
- val producerEpoch = 1.toShort
- // For TV2, when there's no existing producer state, sequence must be 0
for both CLIENT and COORDINATOR
- var sequence = 0
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL))
-
- val idempotentRecords = MemoryRecords.withIdempotentRecords(
- Compression.NONE,
- producerId,
- producerEpoch,
- sequence,
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )
-
- // Only clients have nonzero sequences
- if (appendOrigin == AppendOrigin.CLIENT)
- sequence = sequence + 2
-
- val transactionalRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE,
- producerId,
- producerEpoch,
- sequence,
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )
-
- val verificationGuard = log.maybeStartTransactionVerification(producerId,
sequence, producerEpoch, true)
- assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
- log.appendAsLeader(idempotentRecords, 0, appendOrigin)
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-
- // Since we wrote idempotent records, we keep VerificationGuard.
- assertEquals(verificationGuard, log.verificationGuard(producerId))
-
- // Now write the transactional records
- assertTrue(log.verificationGuard(producerId).verify(verificationGuard))
- log.appendAsLeader(transactionalRecords, 0, appendOrigin,
RequestLocal.noCaching(), verificationGuard,
- TransactionVersion.TV_2.featureLevel())
- assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
- // VerificationGuard should be cleared now.
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
- // A subsequent maybeStartTransactionVerification will be empty since we
are already verified.
- assertEquals(VerificationGuard.SENTINEL,
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
true))
-
- // For TV2, the coordinator bumps the epoch before writing the marker
(KIP-890)
- val bumpedEpoch = (producerEpoch + 1).toShort
- val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
- producerId,
- bumpedEpoch,
- new EndTransactionMarker(ControlRecordType.COMMIT, 0)
- )
-
- log.appendAsLeader(endTransactionMarkerRecord, 0, AppendOrigin.COORDINATOR,
- RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_2.featureLevel())
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
- if (appendOrigin == AppendOrigin.CLIENT)
- sequence = sequence + 1
-
- // A new maybeStartTransactionVerification will not be empty, as we need
to verify the next transaction.
- // For TV2, after the marker is written with bumped epoch, the producer
state now has the bumped epoch
- val newVerificationGuard =
log.maybeStartTransactionVerification(producerId, sequence, bumpedEpoch, true)
- assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard)
- assertNotEquals(verificationGuard, newVerificationGuard)
- assertFalse(verificationGuard.verify(newVerificationGuard))
- }
-
- @ParameterizedTest
- @EnumSource(value = classOf[AppendOrigin], names = Array("CLIENT",
"COORDINATOR"))
- def testTransactionIsOngoingAndVerificationGuardTV1(appendOrigin:
AppendOrigin): Unit = {
- val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
false)
-
- val producerId = 23L
- val producerEpoch = 1.toShort
- // For TV1, can start with non-zero sequences even with non-zero epoch
when no existing producer state
- var sequence = if (appendOrigin == AppendOrigin.CLIENT) 3 else 0
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL))
-
- val idempotentRecords = MemoryRecords.withIdempotentRecords(
- Compression.NONE,
- producerId,
- producerEpoch,
- sequence,
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )
-
- // Only clients have nonzero sequences
- if (appendOrigin == AppendOrigin.CLIENT)
- sequence = sequence + 2
-
- val transactionalRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE,
- producerId,
- producerEpoch,
- sequence,
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )
-
- // For TV1, create verification guard with supportsEpochBump=false
- val verificationGuard = log.maybeStartTransactionVerification(producerId,
sequence, producerEpoch, false)
- assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
- log.appendAsLeader(idempotentRecords, 0, appendOrigin)
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
-
- // Since we wrote idempotent records, we keep VerificationGuard.
- assertEquals(verificationGuard, log.verificationGuard(producerId))
-
- // Now write the transactional records
- assertTrue(log.verificationGuard(producerId).verify(verificationGuard))
- log.appendAsLeader(transactionalRecords, 0, appendOrigin,
RequestLocal.noCaching(), verificationGuard,
- TransactionVersion.TV_1.featureLevel())
- assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
- // VerificationGuard should be cleared now.
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
- // A subsequent maybeStartTransactionVerification will be empty since we
are already verified.
- assertEquals(VerificationGuard.SENTINEL,
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
false))
-
- val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
- producerId,
- producerEpoch,
- new EndTransactionMarker(ControlRecordType.COMMIT, 0)
- )
-
- log.appendAsLeader(endTransactionMarkerRecord, 0, AppendOrigin.COORDINATOR,
- RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_1.featureLevel())
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
- if (appendOrigin == AppendOrigin.CLIENT)
- sequence = sequence + 1
-
- // A new maybeStartTransactionVerification will not be empty, as we need
to verify the next transaction.
- val newVerificationGuard =
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
false)
- assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard)
- assertNotEquals(verificationGuard, newVerificationGuard)
- assertFalse(verificationGuard.verify(newVerificationGuard))
- }
-
- @ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testEmptyTransactionStillClearsVerificationGuard(supportsEpochBump:
Boolean): Unit = {
- val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
-
- val producerId = 23L
- val producerEpoch = 1.toShort
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
-
- val verificationGuard = log.maybeStartTransactionVerification(producerId,
0, producerEpoch, supportsEpochBump)
- assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
- val endMarkerProducerEpoch = if (supportsEpochBump) (producerEpoch +
1).toShort else producerEpoch
- val transactionVersion = if (supportsEpochBump)
TransactionVersion.TV_2.featureLevel() else
TransactionVersion.TV_1.featureLevel()
- val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
- producerId,
- endMarkerProducerEpoch,
- new EndTransactionMarker(ControlRecordType.COMMIT, 0)
- )
-
- log.appendAsLeader(endTransactionMarkerRecord, 0,
AppendOrigin.COORDINATOR, RequestLocal.noCaching(),
- VerificationGuard.SENTINEL, transactionVersion)
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
- }
-
- @Test
- def testNextTransactionVerificationGuardNotCleared(): Unit = {
- val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
-
- val producerId = 23L
- val producerEpoch = 1.toShort
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
-
- val verificationGuard = log.maybeStartTransactionVerification(producerId,
0, producerEpoch, true)
- assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
- // If the producer epoch is the same on the EndTxn marker, the
verification must be for the next transaction, so we shouldn't clear it.
- val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
- producerId,
- producerEpoch,
- new EndTransactionMarker(ControlRecordType.COMMIT, 0)
- )
-
- log.appendAsLeader(endTransactionMarkerRecord, 0, AppendOrigin.COORDINATOR,
- RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_0.featureLevel())
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(verificationGuard, log.verificationGuard(producerId))
- }
-
- @ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testDisabledVerificationClearsVerificationGuard(supportsEpochBump:
Boolean): Unit = {
- val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
-
- val producerId = 23L
- val producerEpoch = 1.toShort
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
-
- val verificationGuard = log.maybeStartTransactionVerification(producerId,
0, producerEpoch, supportsEpochBump)
- assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
- producerStateManagerConfig.setTransactionVerificationEnabled(false)
-
- val transactionalRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE,
- producerId,
- producerEpoch,
- 0,
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )
- log.appendAsLeader(transactionalRecords, 0)
-
- assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
- }
-
- @Test
- def testEnablingVerificationWhenRequestIsAtLogLayer(): Unit = {
- val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
false)
-
- val producerId = 23L
- val producerEpoch = 1.toShort
- val sequence = 0
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
-
- producerStateManagerConfig.setTransactionVerificationEnabled(true)
-
- val transactionalRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE,
- producerId,
- producerEpoch,
- sequence,
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )
- assertThrows(classOf[InvalidTxnStateException], () =>
log.appendAsLeader(transactionalRecords, 0))
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
- val verificationGuard = log.maybeStartTransactionVerification(producerId,
sequence, producerEpoch, true)
- assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
-
- log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching,
- verificationGuard, TransactionVersion.TV_2.featureLevel())
- assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
- }
-
- @ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def
testNonZeroSequenceOnFirstAppendNonZeroEpoch(transactionVerificationEnabled:
Boolean): Unit = {
- val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
transactionVerificationEnabled)
-
- val producerId = 23L
- val producerEpoch = 1.toShort
- val sequence = 3
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
- val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
- assertFalse(log.hasOngoingTransaction(producerId, producerEpoch))
- assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
-
- val transactionalRecords = MemoryRecords.withTransactionalRecords(
- Compression.NONE,
- producerId,
- producerEpoch,
- sequence,
- new SimpleRecord("1".getBytes),
- new SimpleRecord("2".getBytes)
- )
-
- if (transactionVerificationEnabled) {
- // TV2 behavior: Create verification state that supports epoch bumps
- val verificationGuard =
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch, true)
- // Should reject non-zero sequences when there's no existing producer
state
- assertThrows(classOf[OutOfOrderSequenceException], () =>
- log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, verificationGuard,
- TransactionVersion.TV_0.featureLevel()))
- } else {
- // TV1 behavior: Create verification state with supportsEpochBump=false
- val verificationGuard =
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
false)
- // Should allow non-zero sequences with non-zero epoch
- log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, verificationGuard,
- TransactionVersion.TV_0.featureLevel())
- assertTrue(log.hasOngoingTransaction(producerId, producerEpoch))
- }
- }
-
@Test
def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure():
Unit = {
val logConfig = LogTestUtils.createLogConfig()
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index bcefc6679aa..eb6957f13ac 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -25,12 +25,14 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.InconsistentTopicIdException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AbortedTxn;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.FetchResponseData;
@@ -56,6 +58,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.server.common.RequestLocal;
import org.apache.kafka.server.common.TransactionVersion;
+import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
@@ -85,6 +88,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -123,6 +127,7 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -1447,7 +1452,7 @@ public class UnifiedLogTest {
public MemoryRecords.RecordFilter.BatchRetentionResult
checkBatchRetention(RecordBatch batch) {
return new
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY,
false);
}
- @Override
+ @Override
public boolean shouldRetainRecord(RecordBatch recordBatch, Record
record) {
return !record.hasKey();
}
@@ -1496,7 +1501,7 @@ public class UnifiedLogTest {
ByteBuffer filtered = ByteBuffer.allocate(2048);
records.filterTo(new MemoryRecords.RecordFilter(0, 0) {
- @Override
+ @Override
public MemoryRecords.RecordFilter.BatchRetentionResult
checkBatchRetention(RecordBatch batch) {
return new
MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.RETAIN_EMPTY,
true);
}
@@ -1679,7 +1684,7 @@ public class UnifiedLogTest {
log.updateHighWatermark(log.logEndOffset());
log.maybeIncrementLogStartOffset(1L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
assertEquals(
- 2,
+ 2,
log.deleteOldSegments(),
"Expecting two segment deletions as log start offset retention
should unblock time based retention"
);
@@ -1750,7 +1755,7 @@ public class UnifiedLogTest {
.sorted()
.collect(Collectors.toList());
assertEquals(
- expectedSnapshotOffsets,
+ expectedSnapshotOffsets,
snapshotOffsets,
"expected a snapshot file per segment base offset, except the
first segment"
);
@@ -2415,6 +2420,28 @@ public class UnifiedLogTest {
producerStateManagerConfig, true, Optional.empty(), false);
}
+ private UnifiedLog createLog(File dir, LogConfig config,
LogOffsetsListener logOffsetsListener) throws IOException {
+ UnifiedLog log = UnifiedLog.create(
+ dir, config, 0L, 0L, mockTime.scheduler, brokerTopicStats,
mockTime,
+ 3600000, producerStateManagerConfig,
+
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ new LogDirFailureChannel(10), true, Optional.empty(), new
ConcurrentHashMap<>(),
+ false, logOffsetsListener);
+ logsToClose.add(log);
+ return log;
+ }
+
+ private UnifiedLog createLog(File dir, LogConfig config,
ProducerStateManagerConfig psmConfig) throws IOException {
+ UnifiedLog log = UnifiedLog.create(
+ dir, config, 0L, 0L, mockTime.scheduler, brokerTopicStats,
mockTime,
+ 3600000, psmConfig,
+
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ new LogDirFailureChannel(10), true, Optional.empty(), new
ConcurrentHashMap<>(),
+ false, LogOffsetsListener.NO_OP_OFFSETS_LISTENER);
+ logsToClose.add(log);
+ return log;
+ }
+
private UnifiedLog createLog(
File dir,
LogConfig config,
@@ -2579,7 +2606,7 @@ public class UnifiedLogTest {
.build();
log = createLog(logDir, logConfig, true);
- String metricName = "name=RetentionSizeInPercent,topic=" +
log.topicPartition().topic() +
+ String metricName = "name=RetentionSizeInPercent,topic=" +
log.topicPartition().topic() +
",partition=" + log.topicPartition().partition();
// Append some messages to create 3 segments (15 records / 5 records
per segment = 3 segments)
@@ -2618,7 +2645,7 @@ public class UnifiedLogTest {
.build();
log = createLog(logDir, logConfig, false);
- String metricName = "name=RetentionSizeInPercent,topic=" +
log.topicPartition().topic() +
+ String metricName = "name=RetentionSizeInPercent,topic=" +
log.topicPartition().topic() +
",partition=" + log.topicPartition().partition();
for (int i = 0; i < 10; i++) {
@@ -2692,7 +2719,7 @@ public class UnifiedLogTest {
assertEquals(200, yammerMetricValue(metricName),
"Metric should be updated in finally block even when exception
occurs");
}
-
+
@Test
public void testReadWithMinMessage() throws IOException {
LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
@@ -2810,7 +2837,7 @@ public class UnifiedLogTest {
Compression.NONE,
new SimpleRecord(mockTime.milliseconds(), null,
"Test".getBytes())
);
-
+
log.appendAsLeader(message, 0);
log.roll();
assertEquals(2, logDir.listFiles(f ->
f.getName().endsWith(".log")).length);
@@ -3028,7 +3055,7 @@ public class UnifiedLogTest {
log.appendAsLeader(first, 0);
assertThrows(
- RecordTooLargeException.class,
+ RecordTooLargeException.class,
() -> log.appendAsLeader(second, 0),
"Second message set should throw MessageSizeTooLargeException."
);
@@ -3436,7 +3463,7 @@ public class UnifiedLogTest {
UnifiedLog log = createLog(logDir, logConfig);
// Test initial state before any records
- assertFetchOffsetBySpecialTimestamp(log, Optional.empty(),
+ assertFetchOffsetBySpecialTimestamp(log, Optional.empty(),
new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1,
Optional.of(-1)),
ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
@@ -4230,7 +4257,7 @@ public class UnifiedLogTest {
private void prepare(int logStartOffset) throws IOException {
RemoteLogManagerConfig config = createRemoteLogManagerConfig();
- DelayedOperationPurgatory<DelayedRemoteListOffsets> purgatory =
+ DelayedOperationPurgatory<DelayedRemoteListOffsets> purgatory =
new DelayedOperationPurgatory<>("RemoteListOffsets", 0);
remoteLogManager = spy(new RemoteLogManager(
config,
@@ -4279,9 +4306,9 @@ public class UnifiedLogTest {
return result;
}
- private void assertFetchOffsetBySpecialTimestamp(UnifiedLog log,
+ private void assertFetchOffsetBySpecialTimestamp(UnifiedLog log,
Optional<RemoteLogManager> remoteLogManagerOpt,
-
FileRecords.TimestampAndOffset expected,
+
FileRecords.TimestampAndOffset expected,
long timestamp) {
Optional<AsyncOffsetReader> remoteOffsetReader =
remoteLogManagerOpt.map(rlm -> rlm);
OffsetResultHolder offsetResultHolder =
log.fetchOffsetByTimestamp(timestamp, remoteOffsetReader);
@@ -5010,4 +5037,461 @@ public class UnifiedLogTest {
}
builder.close();
}
+
+ @Test
+ public void testMaybeUpdateHighWatermarkAsFollower() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ for (int i = 0; i < 100; i++) {
+ MemoryRecords records = singletonRecords(("test" + i).getBytes());
+ log.appendAsLeader(records, 0);
+ }
+
+ assertEquals(Optional.of(99L), log.maybeUpdateHighWatermark(99L));
+ assertEquals(Optional.empty(), log.maybeUpdateHighWatermark(99L));
+
+ assertEquals(Optional.of(100L), log.maybeUpdateHighWatermark(100L));
+ assertEquals(Optional.empty(), log.maybeUpdateHighWatermark(100L));
+
+ // bound by the log end offset
+ assertEquals(Optional.empty(), log.maybeUpdateHighWatermark(101L));
+ }
+
+ @Test
+ public void testEnableRemoteLogStorageOnCompactedTopics() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertFalse(log.remoteLogEnabled());
+
+ log = createLog(logDir, logConfig, true);
+ assertFalse(log.remoteLogEnabled());
+
+ logConfig = new
LogTestUtils.LogConfigBuilder().remoteLogStorageEnable(true).build();
+ log = createLog(logDir, logConfig, true);
+ assertTrue(log.remoteLogEnabled());
+
+ logConfig = new LogTestUtils.LogConfigBuilder()
+ .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+ .remoteLogStorageEnable(true)
+ .build();
+ log = createLog(logDir, logConfig, true);
+ assertFalse(log.remoteLogEnabled());
+
+ logConfig = new LogTestUtils.LogConfigBuilder()
+ .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT + "," +
TopicConfig.CLEANUP_POLICY_DELETE)
+ .remoteLogStorageEnable(true)
+ .build();
+ log = createLog(logDir, logConfig, true);
+ assertFalse(log.remoteLogEnabled());
+ }
+
+ @Test
+ public void
testRemoteLogStorageIsDisabledOnInternalAndRemoteLogMetadataTopic() throws
IOException {
+ List<TopicPartition> partitions = List.of(
+ new
TopicPartition(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME,
0),
+ new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
+ new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, 0),
+ new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0),
+ new TopicPartition(Topic.CLUSTER_METADATA_TOPIC_NAME, 0)
+ );
+ for (TopicPartition partition : partitions) {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().remoteLogStorageEnable(true).build();
+ File internalLogDir = new File(TestUtils.tempDirectory(),
partition.toString());
+ internalLogDir.mkdir();
+ UnifiedLog log = createLog(internalLogDir, logConfig, true);
+ assertFalse(log.remoteLogEnabled());
+ }
+ }
+
+ @Test
+ public void testNoOpWhenRemoteLogStorageIsDisabled() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ for (int i = 0; i < 100; i++) {
+ MemoryRecords records = singletonRecords(("test" + i).getBytes());
+ log.appendAsLeader(records, 0);
+ }
+
+ log.updateHighWatermark(90L);
+ log.maybeIncrementLogStartOffset(20L,
LogStartOffsetIncrementReason.SegmentDeletion);
+ assertEquals(20, log.logStartOffset());
+ }
+
+ @Test
+ public void testStartOffsetsRemoteLogStorageIsEnabled() throws IOException
{
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().remoteLogStorageEnable(true).build();
+ UnifiedLog log = createLog(logDir, logConfig, true);
+
+ for (int i = 0; i < 100; i++) {
+ MemoryRecords records = singletonRecords(("test" + i).getBytes());
+ log.appendAsLeader(records, 0);
+ }
+
+ log.updateHighWatermark(80L);
+ long newLogStartOffset = 40L;
+ log.maybeIncrementLogStartOffset(newLogStartOffset,
LogStartOffsetIncrementReason.SegmentDeletion);
+ assertEquals(newLogStartOffset, log.logStartOffset());
+ assertEquals(log.logStartOffset(), log.localLogStartOffset());
+
+ // Truncate the local log and verify that the offsets are updated to
expected values
+ long newLocalLogStartOffset = 60L;
+ log.truncateFullyAndStartAt(newLocalLogStartOffset,
Optional.of(newLogStartOffset));
+ assertEquals(newLogStartOffset, log.logStartOffset());
+ assertEquals(newLocalLogStartOffset, log.localLogStartOffset());
+ }
+
+ @Test
+ public void testLogOffsetsListener() throws IOException {
+ MockLogOffsetsListener listener = new MockLogOffsetsListener();
+ listener.verify();
+
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig, listener);
+
+ listener.verify(0L);
+
+ log.appendAsLeader(records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), 0L, 0), 0);
+ log.appendAsLeader(records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), 0L, 0), 0);
+
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(4));
+ listener.verify(4L);
+
+ log.truncateTo(3);
+ listener.verify(3L);
+
+ log.appendAsLeader(records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), 0L, 0), 0);
+ log.truncateFullyAndStartAt(4, Optional.empty());
+ listener.verify(4L);
+ }
+
+ private static class MockLogOffsetsListener implements LogOffsetsListener {
+ private long highWatermark = -1L;
+
+ @Override
+ public void onHighWatermarkUpdated(long offset) {
+ highWatermark = offset;
+ }
+
+ /**
+ * Verifies the callbacks that have been triggered since the last
+ * verification. Values different from {@code -1} are the ones that
have
+ * been updated.
+ */
+ public void verify(long expectedHighWatermark) {
+ assertEquals(expectedHighWatermark, highWatermark, "Unexpected
high watermark");
+ highWatermark = -1L;
+ }
+
+ public void verify() {
+ verify(-1L);
+ }
+ }
+
+ @Test
+ public void testUpdateLogOffsetsListener() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), 0L, 0), 0);
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(2));
+ log.maybeIncrementLogStartOffset(1,
LogStartOffsetIncrementReason.SegmentDeletion);
+
+ MockLogOffsetsListener listener = new MockLogOffsetsListener();
+ listener.verify();
+
+ log.setLogOffsetsListener(listener);
+ listener.verify(); // it is still empty because we don't call the
listener when it is set.
+
+ log.appendAsLeader(records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), 0L, 0), 0);
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(4));
+ listener.verify(4L);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = AppendOrigin.class, names = {"CLIENT", "COORDINATOR"})
+ public void testTransactionIsOngoingAndVerificationGuardTV2(AppendOrigin
appendOrigin) throws IOException {
+ ProducerStateManagerConfig psmConfig = new
ProducerStateManagerConfig(86400000, true);
+
+ long producerId = 23L;
+ short producerEpoch = 1;
+ // For TV2, when there's no existing producer state, sequence must be
0 for both CLIENT and COORDINATOR
+ int sequence = 0;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL));
+
+ MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(
+ Compression.NONE, producerId, producerEpoch, sequence,
+ new SimpleRecord("1".getBytes()), new
SimpleRecord("2".getBytes()));
+
+ // Only clients have nonzero sequences
+ if (appendOrigin == AppendOrigin.CLIENT)
+ sequence += 2;
+
+ MemoryRecords transactionalRecords =
MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, producerEpoch, sequence,
+ new SimpleRecord("1".getBytes()), new
SimpleRecord("2".getBytes()));
+
+ VerificationGuard verificationGuard =
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
true);
+ assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+ log.appendAsLeader(idempotentRecords, 0, appendOrigin);
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+
+ // Since we wrote idempotent records, we keep VerificationGuard.
+ assertEquals(verificationGuard, log.verificationGuard(producerId));
+
+ // Now write the transactional records
+
assertTrue(log.verificationGuard(producerId).verify(verificationGuard));
+ log.appendAsLeader(transactionalRecords, 0, appendOrigin,
RequestLocal.noCaching(),
+ verificationGuard, TransactionVersion.TV_2.featureLevel());
+ assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+ // VerificationGuard should be cleared now.
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+
+ // A subsequent maybeStartTransactionVerification will be empty since
we are already verified.
+ assertEquals(VerificationGuard.SENTINEL,
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
true));
+
+ // For TV2, the coordinator bumps the epoch before writing the marker
(KIP-890)
+ short bumpedEpoch = (short) (producerEpoch + 1);
+ MemoryRecords endTransactionMarkerRecord =
MemoryRecords.withEndTransactionMarker(
+ producerId, bumpedEpoch, new
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
+ log.appendAsLeader(endTransactionMarkerRecord, 0,
AppendOrigin.COORDINATOR,
+ RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_2.featureLevel());
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+
+ if (appendOrigin == AppendOrigin.CLIENT)
+ sequence += 1;
+
+ // A new maybeStartTransactionVerification will not be empty, as we
need to verify the next transaction.
+ // For TV2, after the marker is written with bumped epoch, the
producer state now has the bumped epoch
+ VerificationGuard newVerificationGuard =
log.maybeStartTransactionVerification(producerId, sequence, bumpedEpoch, true);
+ assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard);
+ assertNotEquals(verificationGuard, newVerificationGuard);
+ assertFalse(verificationGuard.verify(newVerificationGuard));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = AppendOrigin.class, names = {"CLIENT", "COORDINATOR"})
+ public void testTransactionIsOngoingAndVerificationGuardTV1(AppendOrigin
appendOrigin) throws IOException {
+ ProducerStateManagerConfig psmConfig = new
ProducerStateManagerConfig(86400000, false);
+
+ long producerId = 23L;
+ short producerEpoch = 1;
+ // For TV1, can start with non-zero sequences even with non-zero epoch
when no existing producer state
+ int sequence = appendOrigin == AppendOrigin.CLIENT ? 3 : 0;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL));
+
+ MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(
+ Compression.NONE, producerId, producerEpoch, sequence,
+ new SimpleRecord("1".getBytes()), new
SimpleRecord("2".getBytes()));
+
+ // Only clients have nonzero sequences
+ if (appendOrigin == AppendOrigin.CLIENT)
+ sequence += 2;
+
+ MemoryRecords transactionalRecords =
MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, producerEpoch, sequence,
+ new SimpleRecord("1".getBytes()), new
SimpleRecord("2".getBytes()));
+
+ // For TV1, create verification guard with supportsEpochBump=false
+ VerificationGuard verificationGuard =
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
false);
+ assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+ log.appendAsLeader(idempotentRecords, 0, appendOrigin);
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+
+ // Since we wrote idempotent records, we keep VerificationGuard.
+ assertEquals(verificationGuard, log.verificationGuard(producerId));
+
+ // Now write the transactional records
+
assertTrue(log.verificationGuard(producerId).verify(verificationGuard));
+ log.appendAsLeader(transactionalRecords, 0, appendOrigin,
RequestLocal.noCaching(),
+ verificationGuard, TransactionVersion.TV_1.featureLevel());
+ assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+ // VerificationGuard should be cleared now.
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+
+ // A subsequent maybeStartTransactionVerification will be empty since
we are already verified.
+ assertEquals(VerificationGuard.SENTINEL,
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
false));
+
+ MemoryRecords endTransactionMarkerRecord =
MemoryRecords.withEndTransactionMarker(
+ producerId, producerEpoch, new
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
+ log.appendAsLeader(endTransactionMarkerRecord, 0,
AppendOrigin.COORDINATOR,
+ RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_1.featureLevel());
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+
+ if (appendOrigin == AppendOrigin.CLIENT)
+ sequence += 1;
+
+ // A new maybeStartTransactionVerification will not be empty, as we
need to verify the next transaction.
+ VerificationGuard newVerificationGuard =
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
false);
+ assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard);
+ assertNotEquals(verificationGuard, newVerificationGuard);
+ assertFalse(verificationGuard.verify(newVerificationGuard));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testEmptyTransactionStillClearsVerificationGuard(boolean
supportsEpochBump) throws IOException {
+ ProducerStateManagerConfig psmConfig = new
ProducerStateManagerConfig(86400000, true);
+
+ long producerId = 23L;
+ short producerEpoch = 1;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+ VerificationGuard verificationGuard =
log.maybeStartTransactionVerification(producerId, 0, producerEpoch,
supportsEpochBump);
+ assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+ short endMarkerProducerEpoch = supportsEpochBump ? (short)
(producerEpoch + 1) : producerEpoch;
+ short transactionVersion = supportsEpochBump ?
TransactionVersion.TV_2.featureLevel() : TransactionVersion.TV_1.featureLevel();
+ MemoryRecords endTransactionMarkerRecord =
MemoryRecords.withEndTransactionMarker(
+ producerId, endMarkerProducerEpoch, new
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
+ log.appendAsLeader(endTransactionMarkerRecord, 0,
AppendOrigin.COORDINATOR,
+ RequestLocal.noCaching(), VerificationGuard.SENTINEL,
transactionVersion);
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+ }
+
+ @Test
+ public void testNextTransactionVerificationGuardNotCleared() throws
IOException {
+ ProducerStateManagerConfig psmConfig = new
ProducerStateManagerConfig(86400000, true);
+
+ long producerId = 23L;
+ short producerEpoch = 1;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+ VerificationGuard verificationGuard =
log.maybeStartTransactionVerification(producerId, 0, producerEpoch, true);
+ assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+ // If the producer epoch is the same on the EndTxn marker, the
verification must be for the next transaction, so we shouldn't clear it.
+ MemoryRecords endTransactionMarkerRecord =
MemoryRecords.withEndTransactionMarker(
+ producerId, producerEpoch, new
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
+ log.appendAsLeader(endTransactionMarkerRecord, 0,
AppendOrigin.COORDINATOR,
+ RequestLocal.noCaching(), VerificationGuard.SENTINEL,
TransactionVersion.TV_0.featureLevel());
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(verificationGuard, log.verificationGuard(producerId));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDisabledVerificationClearsVerificationGuard(boolean
supportsEpochBump) throws IOException {
+ ProducerStateManagerConfig psmConfig = new
ProducerStateManagerConfig(86400000, true);
+
+ long producerId = 23L;
+ short producerEpoch = 1;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+ VerificationGuard verificationGuard =
log.maybeStartTransactionVerification(producerId, 0, producerEpoch,
supportsEpochBump);
+ assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+ psmConfig.setTransactionVerificationEnabled(false);
+
+ MemoryRecords transactionalRecords =
MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, producerEpoch, 0,
+ new SimpleRecord("1".getBytes()), new
SimpleRecord("2".getBytes()));
+ log.appendAsLeader(transactionalRecords, 0);
+
+ assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+ }
+
+ @Test
+ public void testEnablingVerificationWhenRequestIsAtLogLayer() throws
IOException {
+ ProducerStateManagerConfig psmConfig = new
ProducerStateManagerConfig(86400000, false);
+
+ long producerId = 23L;
+ short producerEpoch = 1;
+ int sequence = 0;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+
+ psmConfig.setTransactionVerificationEnabled(true);
+
+ MemoryRecords transactionalRecords =
MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, producerEpoch, sequence,
+ new SimpleRecord("1".getBytes()), new
SimpleRecord("2".getBytes()));
+ assertThrows(InvalidTxnStateException.class, () ->
log.appendAsLeader(transactionalRecords, 0));
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+
+ VerificationGuard verificationGuard =
log.maybeStartTransactionVerification(producerId, sequence, producerEpoch,
true);
+ assertNotEquals(VerificationGuard.SENTINEL, verificationGuard);
+
+ log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(),
+ verificationGuard, TransactionVersion.TV_2.featureLevel());
+ assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testNonZeroSequenceOnFirstAppendNonZeroEpoch(boolean
transactionVerificationEnabled) throws IOException {
+ ProducerStateManagerConfig psmConfig = new
ProducerStateManagerConfig(86400000, transactionVerificationEnabled);
+
+ long producerId = 23L;
+ short producerEpoch = 1;
+ int sequence = 3;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, psmConfig);
+ assertFalse(log.hasOngoingTransaction(producerId, producerEpoch));
+ assertEquals(VerificationGuard.SENTINEL,
log.verificationGuard(producerId));
+
+ MemoryRecords transactionalRecords =
MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, producerEpoch, sequence,
+ new SimpleRecord("1".getBytes()), new
SimpleRecord("2".getBytes()));
+
+ VerificationGuard verificationGuard =
log.maybeStartTransactionVerification(producerId, sequence,
+ producerEpoch, transactionVerificationEnabled);
+ if (transactionVerificationEnabled) {
+ // TV2 behavior: Create verification state that supports epoch
bumps
+ // Should reject non-zero sequences when there's no existing
producer state
+ assertThrows(OutOfOrderSequenceException.class, () ->
+ log.appendAsLeader(transactionalRecords, 0,
AppendOrigin.CLIENT, RequestLocal.noCaching(),
+ verificationGuard,
TransactionVersion.TV_0.featureLevel()));
+ } else {
+ // TV1 behavior: Create verification state with
supportsEpochBump=false
+ // Should allow non-zero sequences with non-zero epoch
+ log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching(),
+ verificationGuard, TransactionVersion.TV_0.featureLevel());
+ assertTrue(log.hasOngoingTransaction(producerId, producerEpoch));
+ }
+ }
}