Yunyung commented on code in PR #20014:
URL: https://github.com/apache/kafka/pull/20014#discussion_r2164451789
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1225,65 +1196,55 @@ class ReplicaManagerTest {
}
}
- @Test
- def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = {
- verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(new
Properties, expectTruncation = false)
- }
-
/**
* If a partition becomes a follower and the leader is unchanged it should
check for truncation
* if the epoch has increased by more than one (which suggests it has missed
an update). For
* IBP version 2.7 onwards, we don't require this since we can truncate at
any time based
* on diverging epochs returned in fetch responses.
*/
- private def
verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(extraProps:
Properties,
-
expectTruncation: Boolean): Unit = {
- val topicPartition = 0
+ @Test
+ def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = {
+ val extraProps = new Properties
+ val expectTruncation = false
val followerBrokerId = 0
val leaderBrokerId = 1
- val controllerId = 0
- val controllerEpoch = 0
var leaderEpoch = 1
val leaderEpochIncrement = 2
- val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
val countDownLatch = new CountDownLatch(1)
val offsetFromLeader = 5
-
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new
MockTimer(time),
- topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch,
+ topicPartition.partition(), leaderEpoch + leaderEpochIncrement,
followerBrokerId, leaderBrokerId, countDownLatch,
expectTruncation = expectTruncation, localLogOffset = Optional.of(10),
offsetFromLeader = offsetFromLeader, extraProps = extraProps, topicId =
Optional.of(topicId))
try {
// Initialize partition state to follower, with leader = 1, leaderEpoch
= 1
- val tp = new TopicPartition(topic, topicPartition)
- val partition = replicaManager.createPartition(tp)
+ val partition = replicaManager.createPartition(topicPartition)
val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
offsetCheckpoints, None)
- partition.makeFollower(
- leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId,
aliveBrokerIds),
- offsetCheckpoints,
- None)
+ val followerDelta = topicsCreateDelta(startId = followerBrokerId,
isStartIdLeader = false, partitions = List(topicPartition.partition()),
List.empty, topic, topicIds(topic), leaderEpoch)
+ replicaManager.applyDelta(followerDelta,
imageFromTopics(followerDelta.apply()))
+
+ // Verify log created and partition is hosted
+ val localLog = replicaManager.localLog(topicPartition)
+ assertTrue(localLog.isDefined, "Log should be created for follower after
applyDelta")
+ val hostedPartition = replicaManager.getPartition(topicPartition)
+ assertTrue(hostedPartition.isInstanceOf[HostedPartition.Online])
// Make local partition a follower - because epoch increased by more
than 1, truncation should
// trigger even though leader does not change
leaderEpoch += leaderEpochIncrement
- val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(
- controllerId, controllerEpoch, brokerEpoch,
- Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId,
aliveBrokerIds)).asJava,
- Collections.singletonMap(topic, topicId),
- Set(new Node(followerBrokerId, "host1", 0),
- new Node(leaderBrokerId, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(correlationId,
leaderAndIsrRequest0,
- (_, followers) => assertEquals(followerBrokerId,
followers.head.partitionId))
+ val epochJumpDelta = topicsCreateDelta(startId = followerBrokerId,
isStartIdLeader = false, partitions = List(topicPartition.partition()),
List.empty, topic, topicIds(topic), leaderEpoch)
+ replicaManager.applyDelta(epochJumpDelta,
imageFromTopics(epochJumpDelta.apply()))
+
assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS))
// Truncation should have happened once
if (expectTruncation) {
Review Comment:
Please also update the code comments for this method related to
expectTruncation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]