Copilot commented on code in PR #20128:
URL: https://github.com/apache/kafka/pull/20128#discussion_r2236780174
##########
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala:
##########
@@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness {
} finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
}
+ @Test
+ def testMigrationZnodeWithNullValue(): Unit = {
+ val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
+ var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42,
Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
+ zkClient.retryRequestUntilConnected(CreateRequest(
+ MigrationZNode.path,
+ null,
+ zkClient.defaultAcls(MigrationZNode.path),
+ CreateMode.PERSISTENT))
+
+ migrationState = zkClient.getOrCreateMigrationState(migrationState)
+
+ assertEquals(0, migrationState.migrationZkVersion())
Review Comment:
[nitpick] The variable `migrationState` is declared as `var` but could be
`val` since it's only reassigned once. Consider using `val` for the initial
declaration and a different variable name for the reassignment to improve code
clarity.
```suggestion
val migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42,
Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
zkClient.retryRequestUntilConnected(CreateRequest(
MigrationZNode.path,
null,
zkClient.defaultAcls(MigrationZNode.path),
CreateMode.PERSISTENT))
val updatedMigrationState =
zkClient.getOrCreateMigrationState(migrationState)
assertEquals(0, updatedMigrationState.migrationZkVersion())
```
##########
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala:
##########
@@ -1442,6 +1442,21 @@ class KafkaZkClientTest extends QuorumTestHarness {
} finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
}
+ @Test
+ def testMigrationZnodeWithNullValue(): Unit = {
+ val (controllerEpoch, stat) = zkClient.getControllerEpoch.get
+ var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42,
Time.SYSTEM.milliseconds(), -1, controllerEpoch, stat.getVersion)
Review Comment:
[nitpick] The magic numbers (3000, 42, 100, 42) make the test hard to
understand. Consider using named constants or variables to clarify what these
values represent.
```suggestion
val brokerId = 3000
val leaderEpoch = 42
val controllerEpochValue = 100
val zkVersion = 42
var migrationState = new ZkMigrationLeadershipState(brokerId,
leaderEpoch, controllerEpochValue, zkVersion, Time.SYSTEM.milliseconds(), -1,
controllerEpoch, stat.getVersion)
```
##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -1734,20 +1734,27 @@ class KafkaZkClient private[zk] (
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK =>
- MigrationZNode.decode(getDataResponse.data,
getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
+ Option(getDataResponse.data) match {
+ case Some(data) =>
+ MigrationZNode.decode(data, getDataResponse.stat.getVersion,
getDataResponse.stat.getMtime)
+ case None =>
+ createInitialMigrationState(initialState, removeFirst = true)
+ }
case Code.NONODE =>
createInitialMigrationState(initialState)
case _ => throw getDataResponse.resultException.get
}
}
- private def createInitialMigrationState(initialState:
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
- val createRequest = CreateRequest(
+ private def createInitialMigrationState(initialState:
ZkMigrationLeadershipState, removeFirst: Boolean = false):
ZkMigrationLeadershipState = {
+ val createOp = CreateOp(
MigrationZNode.path,
MigrationZNode.encode(initialState),
defaultAcls(MigrationZNode.path),
CreateMode.PERSISTENT)
- val response = retryRequestUntilConnected(createRequest)
+ val deleteOp = DeleteOp(MigrationZNode.path, ZkVersion.MatchAnyVersion)
+ val multi = MultiRequest((if (removeFirst) Some(deleteOp) else None).toSeq
++ Seq(createOp))
Review Comment:
[nitpick] The conditional logic for building the MultiRequest operations is
complex and could be simplified. Consider extracting this into a clearer
variable assignment or using a more readable approach like building a list of
operations.
```suggestion
val operations = if (removeFirst) Seq(deleteOp, createOp) else
Seq(createOp)
val multi = MultiRequest(operations)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]