kamalcph commented on code in PR #16884:
URL: https://github.com/apache/kafka/pull/16884#discussion_r2299738443
##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1180,4 +1180,562 @@ class AbstractFetcherThreadTest {
fetcher.processFetchRequest(partitionData, fetchRequestOpt)
assertEquals(0, replicaState.logEndOffset, "FetchResponse should be
ignored when leader epoch does not match")
}
+
+ private def emptyReplicaState(rlmEnabled: Boolean, partition:
TopicPartition, fetcher: MockFetcherThread) = {
+ // Follower begins with an empty log
+ val replicaState = PartitionState(Seq(), leaderEpoch = 0, highWatermark =
0L, rlmEnabled = rlmEnabled)
+ fetcher.setReplicaState(partition, replicaState)
+ fetcher.addPartitions(Map(partition ->
initialFetchState(topicIds.get(partition.topic), fetchOffset = 0, leaderEpoch =
0)))
+ replicaState
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Disabled and Leader
LogStartOffset = 0
+ *
+ * Purpose:
+ * - Simulate a leader with logs starting at offset 0 and validate how the
follower
+ * behaves when TieredStorage is disabled.
+ *
+ * Conditions:
+ * - TieredStorage: **Disabled**
+ * - Leader LogStartOffset: **0**
+ *
+ * Scenario:
+ * - The leader starts with a log at offset 0, containing three record
batches offset at 0, 150, and 199.
+ * - The follower begins fetching, and we validate the correctness of its
replica state as it fetches.
+ *
+ * Expected Outcomes:
+ * 1. The follower fetch state should transition to `FETCHING` initially.
+ * 2. After the first poll, one record batch is fetched.
+ * 3. After subsequent polls, the entire leader log is fetched:
+ * - Replica log size: 3
+ * - Replica LogStartOffset: 0
+ * - Replica LogEndOffset: 200
+ * - Replica HighWatermark: 199
+ */
+ @Test
+ def testEmptyFollowerFetchTieredStorageDisabledLeaderLogStartOffsetZero():
Unit = {
+ val rlmEnabled = false
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 0
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(1, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(1, replicaState.logEndOffset)
+ assertEquals(Some(1), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Disabled and Leader
LogStartOffset != 0
+ *
+ * Purpose:
+ * - Validate follower behavior when the leader's log starts at a non-zero
offset (10).
+ *
+ * Conditions:
+ * - TieredStorage: **Disabled**
+ * - Leader LogStartOffset: **10**
+ *
+ * Scenario:
+ * - The leader log starts at offset 10 with batches at 10, 150, and 199.
+ * - The follower starts fetching from offset 10.
+ *
+ * Expected Outcomes:
+ * 1. The follower's initial log is empty.
+ * 2. Replica offsets after polls:
+ * - LogStartOffset = 10
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageDisabledLeaderLogStartOffsetNonZero(): Unit
= {
+ val rlmEnabled = false
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ // Follower gets out-of-range error (no messages received), fetch offset
is updated from 0 to 10
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, Leader
LogStartOffset = 0, and No Local Deletions
+ *
+ * Purpose:
+ * - Simulate TieredStorage enabled and validate follower fetching behavior
when the leader
+ * log starts at 0 and no segments have been uploaded or deleted locally.
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **0** (No local segments deleted).
+ *
+ * Scenario:
+ * - The leader log contains three record batches at offsets 0, 150, and 199.
+ * - The follower starts fetching from offset 0.
+ *
+ * Expected Outcomes:
+ * 1. The replica log accurately reflects the leader's log:
+ * - LogStartOffset = 0
+ * - LocalLogStartOffset = 0
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetZeroNoLocalDeletions():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 0
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(1, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(0, replicaState.localLogStartOffset)
+ assertEquals(1, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ assertEquals(Some(1), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(0, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, Leader
LogStartOffset = 0, and Local Deletions
+ *
+ * Purpose:
+ * - Simulate TieredStorage enabled with some segments uploaded and deleted
locally, causing
+ * a difference between the leader's LogStartOffset (0) and
LocalLogStartOffset (> 0).
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **100** (Some segments deleted locally).
+ *
+ * Scenario:
+ * - The leader log starts at offset 0 but the local leader log starts at
offset 100.
+ * - The follower fetch operation begins from offset 0.
+ *
+ * Expected Outcomes:
+ * 1. After offset adjustments for local deletions:
+ * - LogStartOffset = 0
+ * - LocalLogStartOffset = 100
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetZeroWithLocalDeletions():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 100
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled
+ )
+ leaderState.logStartOffset = 0
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(100, replicaState.logEndOffset)
+ assertEquals(Some(100), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, Leader
LogStartOffset != 0, and No Local Deletions
+ *
+ * Purpose:
+ * - Simulate TieredStorage enabled and validate follower fetch behavior
when the leader's log
+ * starts at a non-zero offset and no local deletions have occurred.
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **10**
+ * - Leader LocalLogStartOffset: **10** (No deletions).
+ *
+ * Scenario:
+ * - The leader log starts at offset 10 with batches at 10, 150, and 199.
+ * - The follower starts fetching from offset 10.
+ *
+ * Expected Outcomes:
+ * 1. After fetching, the replica log matches the leader:
+ * - LogStartOffset = 10
+ * - LocalLogStartOffset = 10
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetNonZeroNoLocalDeletions():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, Leader
LogStartOffset != 0, and Local Deletions
+ *
+ * Purpose:
+ * - Validate follower adjustments when the leader has log deletions causing
+ * LocalLogStartOffset > LogStartOffset.
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **10**
+ * - Leader LocalLogStartOffset: **100** (All older segments deleted
locally).
+ *
+ * Scenario:
+ * - The leader log starts at offset 10 but the local log starts at offset
100.
+ * - The follower fetch starts at offset 10 but adjusts for local deletions.
+ *
+ * Expected Outcomes:
+ * 1. Initial fetch offset adjustments:
+ * - First adjustment: LogEndOffset = 10 (after offset-out-of-range error)
+ * - Second adjustment: LogEndOffset = 100 (after
offset-moved-to-tiered-storage error)
+ * 2. After successful fetches:
+ * - 3 record batches fetched
+ * - LogStartOffset = 10
+ * - LocalLogStartOffset = 100
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetNonZeroWithLocalDeletions():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 100
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ // On offset-out-of-range error, fetch offset is updated
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ fetcher.doWork()
+ // On offset-moved-to-tiered-storage error, fetch offset is updated
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(100, replicaState.logEndOffset)
+ assertEquals(Some(100), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, All Local Segments
Deleted
+ *
+ * Purpose:
+ * - Handle scenarios where all local segments have been deleted:
+ * - LocalLogStartOffset > LogStartOffset.
+ * - LocalLogStartOffset = LogEndOffset.
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **0 or > 0**
+ * - Leader LocalLogStartOffset: Leader LogEndOffset (all segments deleted
locally).
+ *
+ * Expected Outcomes:
+ * 1. Follower state is adjusted to reflect local deletions:
+ * - LocalLogStartOffset = LogEndOffset.
+ * - No new data remains to fetch.
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetZeroAllLocalSegmentsDeleted():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 100
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 151L,
+ rlmEnabled = rlmEnabled
+ )
+ leaderState.logStartOffset = 0
+ // Set Local Log Start Offset to Log End Offset
+ leaderState.localLogStartOffset = 151
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+
+ // On offset-moved-to-tiered-storage error, fetch offset is updated
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(151, replicaState.localLogStartOffset)
+ assertEquals(151, replicaState.logEndOffset)
+ assertEquals(151, replicaState.highWatermark)
+ assertEquals(Some(151), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Call once again to see if new data is received
+ fetcher.doWork()
+ // No metadata update expected
+ assertEquals(0, replicaState.log.size)
+ assertEquals(151, replicaState.localLogStartOffset)
Review Comment:
shall we also assert logStartOffset?
##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1180,4 +1180,562 @@ class AbstractFetcherThreadTest {
fetcher.processFetchRequest(partitionData, fetchRequestOpt)
assertEquals(0, replicaState.logEndOffset, "FetchResponse should be
ignored when leader epoch does not match")
}
+
+ private def emptyReplicaState(rlmEnabled: Boolean, partition:
TopicPartition, fetcher: MockFetcherThread) = {
+ // Follower begins with an empty log
+ val replicaState = PartitionState(Seq(), leaderEpoch = 0, highWatermark =
0L, rlmEnabled = rlmEnabled)
+ fetcher.setReplicaState(partition, replicaState)
+ fetcher.addPartitions(Map(partition ->
initialFetchState(topicIds.get(partition.topic), fetchOffset = 0, leaderEpoch =
0)))
+ replicaState
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Disabled and Leader
LogStartOffset = 0
+ *
+ * Purpose:
+ * - Simulate a leader with logs starting at offset 0 and validate how the
follower
+ * behaves when TieredStorage is disabled.
+ *
+ * Conditions:
+ * - TieredStorage: **Disabled**
+ * - Leader LogStartOffset: **0**
+ *
+ * Scenario:
+ * - The leader starts with a log at offset 0, containing three record
batches offset at 0, 150, and 199.
+ * - The follower begins fetching, and we validate the correctness of its
replica state as it fetches.
+ *
+ * Expected Outcomes:
+ * 1. The follower fetch state should transition to `FETCHING` initially.
+ * 2. After the first poll, one record batch is fetched.
+ * 3. After subsequent polls, the entire leader log is fetched:
+ * - Replica log size: 3
+ * - Replica LogStartOffset: 0
+ * - Replica LogEndOffset: 200
+ * - Replica HighWatermark: 199
+ */
+ @Test
+ def testEmptyFollowerFetchTieredStorageDisabledLeaderLogStartOffsetZero():
Unit = {
+ val rlmEnabled = false
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 0
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(1, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(1, replicaState.logEndOffset)
+ assertEquals(Some(1), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Disabled and Leader
LogStartOffset != 0
+ *
+ * Purpose:
+ * - Validate follower behavior when the leader's log starts at a non-zero
offset (10).
+ *
+ * Conditions:
+ * - TieredStorage: **Disabled**
+ * - Leader LogStartOffset: **10**
+ *
+ * Scenario:
+ * - The leader log starts at offset 10 with batches at 10, 150, and 199.
+ * - The follower starts fetching from offset 10.
+ *
+ * Expected Outcomes:
+ * 1. The follower's initial log is empty.
+ * 2. Replica offsets after polls:
+ * - LogStartOffset = 10
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageDisabledLeaderLogStartOffsetNonZero(): Unit
= {
+ val rlmEnabled = false
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ // Follower gets out-of-range error (no messages received), fetch offset
is updated from 0 to 10
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, Leader
LogStartOffset = 0, and No Local Deletions
+ *
+ * Purpose:
+ * - Simulate TieredStorage enabled and validate follower fetching behavior
when the leader
+ * log starts at 0 and no segments have been uploaded or deleted locally.
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **0** (No local segments deleted).
+ *
+ * Scenario:
+ * - The leader log contains three record batches at offsets 0, 150, and 199.
+ * - The follower starts fetching from offset 0.
+ *
+ * Expected Outcomes:
+ * 1. The replica log accurately reflects the leader's log:
+ * - LogStartOffset = 0
+ * - LocalLogStartOffset = 0
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetZeroNoLocalDeletions():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 0
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(1, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(0, replicaState.localLogStartOffset)
+ assertEquals(1, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ assertEquals(Some(1), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 2) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(0, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, Leader
LogStartOffset = 0, and Local Deletions
+ *
+ * Purpose:
+ * - Simulate TieredStorage enabled with some segments uploaded and deleted
locally, causing
+ * a difference between the leader's LogStartOffset (0) and
LocalLogStartOffset (> 0).
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **0**
+ * - Leader LocalLogStartOffset: **100** (Some segments deleted locally).
+ *
+ * Scenario:
+ * - The leader log starts at offset 0 but the local leader log starts at
offset 100.
+ * - The follower fetch operation begins from offset 0.
+ *
+ * Expected Outcomes:
+ * 1. After offset adjustments for local deletions:
+ * - LogStartOffset = 0
+ * - LocalLogStartOffset = 100
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetZeroWithLocalDeletions():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 100
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled
+ )
+ leaderState.logStartOffset = 0
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(100, replicaState.logEndOffset)
+ assertEquals(Some(100), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(0, replicaState.logStartOffset)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, Leader
LogStartOffset != 0, and No Local Deletions
+ *
+ * Purpose:
+ * - Simulate TieredStorage enabled and validate follower fetch behavior
when the leader's log
+ * starts at a non-zero offset and no local deletions have occurred.
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **10**
+ * - Leader LocalLogStartOffset: **10** (No deletions).
+ *
+ * Scenario:
+ * - The leader log starts at offset 10 with batches at 10, 150, and 199.
+ * - The follower starts fetching from offset 10.
+ *
+ * Expected Outcomes:
+ * 1. After fetching, the replica log matches the leader:
+ * - LogStartOffset = 10
+ * - LocalLogStartOffset = 10
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetNonZeroNoLocalDeletions():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LogStartOffset = LocalLogStartOffset = 10
+ mkBatch(baseOffset = 10, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ )
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, Leader
LogStartOffset != 0, and Local Deletions
+ *
+ * Purpose:
+ * - Validate follower adjustments when the leader has log deletions causing
+ * LocalLogStartOffset > LogStartOffset.
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **10**
+ * - Leader LocalLogStartOffset: **100** (All older segments deleted
locally).
+ *
+ * Scenario:
+ * - The leader log starts at offset 10 but the local log starts at offset
100.
+ * - The follower fetch starts at offset 10 but adjusts for local deletions.
+ *
+ * Expected Outcomes:
+ * 1. Initial fetch offset adjustments:
+ * - First adjustment: LogEndOffset = 10 (after offset-out-of-range error)
+ * - Second adjustment: LogEndOffset = 100 (after
offset-moved-to-tiered-storage error)
+ * 2. After successful fetches:
+ * - 3 record batches fetched
+ * - LogStartOffset = 10
+ * - LocalLogStartOffset = 100
+ * - LogEndOffset = 200
+ * - HighWatermark = 199
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetNonZeroWithLocalDeletions():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 100
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 199, leaderEpoch = 0, new
SimpleRecord("e".getBytes))
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 199L,
+ rlmEnabled = rlmEnabled,
+ )
+ leaderState.logStartOffset = 10
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+ // On offset-out-of-range error, fetch offset is updated
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ fetcher.doWork()
+ // On offset-moved-to-tiered-storage error, fetch offset is updated
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(100, replicaState.logEndOffset)
+ assertEquals(Some(100), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Only 1 record batch is returned after a poll so calling 'n' number of
times to get the desired result.
+ for (_ <- 1 to 3) fetcher.doWork()
+ assertEquals(3, replicaState.log.size)
+ assertEquals(10, replicaState.logStartOffset)
+ assertEquals(100, replicaState.localLogStartOffset)
+ assertEquals(200, replicaState.logEndOffset)
+ assertEquals(199, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, All Local Segments
Deleted
+ *
+ * Purpose:
+ * - Handle scenarios where all local segments have been deleted:
+ * - LocalLogStartOffset > LogStartOffset.
+ * - LocalLogStartOffset = LogEndOffset.
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **0 or > 0**
+ * - Leader LocalLogStartOffset: Leader LogEndOffset (all segments deleted
locally).
+ *
+ * Expected Outcomes:
+ * 1. Follower state is adjusted to reflect local deletions:
+ * - LocalLogStartOffset = LogEndOffset.
+ * - No new data remains to fetch.
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetZeroAllLocalSegmentsDeleted():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 100
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 151L,
+ rlmEnabled = rlmEnabled
+ )
+ leaderState.logStartOffset = 0
+ // Set Local Log Start Offset to Log End Offset
+ leaderState.localLogStartOffset = 151
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+
+ // On offset-moved-to-tiered-storage error, fetch offset is updated
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(151, replicaState.localLogStartOffset)
+ assertEquals(151, replicaState.logEndOffset)
+ assertEquals(151, replicaState.highWatermark)
+ assertEquals(Some(151), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Call once again to see if new data is received
+ fetcher.doWork()
+ // No metadata update expected
+ assertEquals(0, replicaState.log.size)
+ assertEquals(151, replicaState.localLogStartOffset)
+ assertEquals(151, replicaState.logEndOffset)
+ assertEquals(151, replicaState.highWatermark)
+ }
+
+ /**
+ * Test: Empty Follower Fetch with TieredStorage Enabled, Leader
LogStartOffset != 0, and All Local Segments Deleted
+ *
+ * Purpose:
+ * - Validate follower behavior when TieredStorage is enabled, the leader's
log starts at a non-zero offset,
+ * and all local log segments have been deleted.
+ *
+ * Conditions:
+ * - TieredStorage: **Enabled**
+ * - Leader LogStartOffset: **10**
+ * - Leader LocalLogStartOffset: **151** (all older segments deleted
locally).
+ *
+ * Scenario:
+ * - The leader log contains record batches from offset 100, but all local
segments up to offset 151 are deleted.
+ * - The follower starts at LogStartOffset = 10 and adjusts for local
segment deletions.
+ *
+ * Expected Outcomes:
+ * 1. Follower detects offset adjustments due to local deletions:
+ * - LogStartOffset remains 10.
+ * - LocalLogStartOffset updates to 151.
+ * - LogEndOffset updates to 151.
+ * 2. HighWatermark aligns with the leader (151).
+ * 3. No new data is fetched since all relevant segments are deleted.
+ */
+ @Test
+ def
testEmptyFollowerFetchTieredStorageEnabledLeaderLogStartOffsetNonZeroAllLocalSegmentsDeleted():
Unit = {
+ val rlmEnabled = true
+ val partition = new TopicPartition("topic1", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+
+ val replicaState = emptyReplicaState(rlmEnabled, partition, fetcher)
+
+ val leaderLog = Seq(
+ // LocalLogStartOffset = 100
+ mkBatch(baseOffset = 100, leaderEpoch = 0, new
SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 150, leaderEpoch = 0, new
SimpleRecord("d".getBytes)),
+ )
+
+ val leaderState = PartitionState(
+ leaderLog,
+ leaderEpoch = 0,
+ highWatermark = 151L,
+ rlmEnabled = rlmEnabled
+ )
+ leaderState.logStartOffset = 10
+ // Set Local Log Start Offset to Log End Offset
+ leaderState.localLogStartOffset = 151
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ fetcher.doWork()
+
+ // On offset-out-of-range error, fetch offset is updated
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(10, replicaState.localLogStartOffset)
+ assertEquals(10, replicaState.logEndOffset)
+ assertEquals(Some(10), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // On offset-moved-to-tiered-storage error, fetch offset is updated
+ fetcher.doWork()
+ assertEquals(Option(ReplicaState.FETCHING),
fetcher.fetchState(partition).map(_.state))
+ assertEquals(0, replicaState.log.size)
+ assertEquals(151, replicaState.localLogStartOffset)
+ assertEquals(151, replicaState.logEndOffset)
+ assertEquals(151, replicaState.highWatermark)
+ assertEquals(Some(151), fetcher.fetchState(partition).map(_.fetchOffset()))
+
+ // Call once again to see if new data is received
+ fetcher.doWork()
+ // No metadata update expected
+ assertEquals(0, replicaState.log.size)
Review Comment:
shall we also assert logStartOffset?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]