chia7712 commented on code in PR #19535:
URL: https://github.com/apache/kafka/pull/19535#discussion_r2061429213
##########
core/src/main/scala/kafka/server/AbstractFetcherManager.scala:
##########
@@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <:
AbstractFetcherThread](val name: Stri
metricsGroup.newGauge("MaxLag", () => {
// current max lag across all fetchers/topics/partitions
fetcherThreadMap.values.foldLeft(0L) { (curMaxLagAll, fetcherThread) =>
- val maxLagThread =
fetcherThread.fetcherLagStats.stats.values.foldLeft(0L)((curMaxLagThread,
lagMetrics) =>
+ val maxLagThread =
fetcherThread.fetcherLagStats.stats.values.asScala.foldLeft(0L)((curMaxLagThread,
lagMetrics) =>
Review Comment:
```scala
val maxLagThread =
fetcherThread.fetcherLagStats.stats.values.stream().mapToLong(v =>
v.lag).max().orElse(0L)
```
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -983,12 +983,12 @@ class Partition(val topicPartition: TopicPartition,
): Unit = {
if (isLeader) {
val followers = replicas.filter(_ != localBrokerId)
- val removedReplicas =
remoteReplicasMap.keys.filterNot(followers.contains(_))
+ val removedReplicas =
remoteReplicasMap.keySet.asScala.filterNot(followers.contains(_))
// Due to code paths accessing remoteReplicasMap without a lock,
// first add the new replicas and then remove the old ones.
- followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new
Replica(id, topicPartition, metadataCache)))
- remoteReplicasMap.removeAll(removedReplicas)
+ followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new
Replica(id, topicPartition, metadataCache)))
+ remoteReplicasMap.keySet.removeAll(removedReplicas.asJavaCollection)
Review Comment:
```java
remoteReplicasMap.keySet.removeIf(replica => !followers.contains(replica))
```
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2512,7 +2510,7 @@ class ReplicaManager(val config: KafkaConfig,
trace("Evaluating ISR list of partitions to see which replicas can be
removed from the ISR")
// Shrink ISRs for non offline partitions
- allPartitions.keys.foreach { topicPartition =>
+ allPartitions.keys.asScala.foreach { topicPartition =>
Review Comment:
```scala
allPartitions.forEach { (topicPartition, _) =>
onlinePartition(topicPartition).foreach(_.maybeShrinkIsr())
}
```
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -916,7 +915,7 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
}
def unregister(): Unit = {
- stats.keys.toBuffer.foreach { key: TopicPartition =>
+ stats.keys.asScala.toBuffer.foreach { key: TopicPartition =>
Review Comment:
```scala
stats.forEach((key, _) => unregister(key))
```
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -983,12 +983,12 @@ class Partition(val topicPartition: TopicPartition,
): Unit = {
if (isLeader) {
val followers = replicas.filter(_ != localBrokerId)
- val removedReplicas =
remoteReplicasMap.keys.filterNot(followers.contains(_))
+ val removedReplicas =
remoteReplicasMap.keySet.asScala.filterNot(followers.contains(_))
// Due to code paths accessing remoteReplicasMap without a lock,
// first add the new replicas and then remove the old ones.
- followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new
Replica(id, topicPartition, metadataCache)))
- remoteReplicasMap.removeAll(removedReplicas)
+ followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new
Replica(id, topicPartition, metadataCache)))
Review Comment:
k -> `_`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]