Yunyung commented on code in PR #20405:
URL: https://github.com/apache/kafka/pull/20405#discussion_r2300269437
##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -1568,6 +1589,7 @@ class TestMetricsReporter extends MetricsReporter with
Reconfigurable with Close
@volatile var closeCount = 0
@volatile var clusterUpdateCount = 0
@volatile var pollingInterval: Int = -1
+ @volatile var numFetchers: Int = 1
Review Comment:
Maybe changing it to = 0 makes it clearer that it will change
statically/dynamically.
##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -1599,31 +1622,37 @@ class TestMetricsReporter extends MetricsReporter with
Reconfigurable with Close
}
override def reconfigurableConfigs(): util.Set[String] = {
- util.Set.of(PollingIntervalProp)
+ util.Set.of(PollingIntervalProp,
ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
}
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
val pollingInterval = configs.get(PollingIntervalProp).toString.toInt
if (pollingInterval <= 0)
throw new ConfigException(s"Invalid polling interval $pollingInterval")
+
+ val numFetchers =
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
+ if (numFetchers <= 0)
+ throw new ConfigException(s"Invalid num.replica.fetchers $numFetchers")
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
reconfigureCount += 1
pollingInterval = configs.get(PollingIntervalProp).toString.toInt
+ numFetchers =
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
}
override def close(): Unit = {
closeCount += 1
}
- def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval:
Int): Unit = {
+ def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval:
Int, numFetcher: Int = 1): Unit = {
Review Comment:
```suggestion
def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval:
Int, numFetcher: Int): Unit = {
```
##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -947,6 +947,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
// Add a new metrics reporter
val newProps = new Properties
newProps.put(TestMetricsReporter.PollingIntervalProp, "100")
+ newProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
Review Comment:
Should we change all verifyState calls in this method to numFetcher = 1?
It's easier to understand, IMO.
Like reporters.foreach(_.verifyState(reconfigureCount = 0, deleteCount = 0,
pollingInterval = 100, **numFetcher=1**))
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]