showuon commented on code in PR #16653:
URL: https://github.com/apache/kafka/pull/16653#discussion_r1699435878


##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -68,25 +68,62 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
     }
 
     val logs = logManager.logsByTopic(topic)
-    val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
+    val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+    val wasCopyDisabled = logs.exists(_.config.remoteCopyDisabled())
 
-    logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
-    maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate)
+    // kafkaController is only defined in Zookeeper's mode
+    logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
+      wasRemoteLogEnabled, kafkaController.isDefined)
+    maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, 
wasCopyDisabled)
   }
 
-  private[server] def maybeBootstrapRemoteLogComponents(topic: String,
-                                                        logs: Seq[UnifiedLog],
-                                                        
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
+  private[server] def maybeUpdateRemoteLogComponents(topic: String,
+                                                     logs: Seq[UnifiedLog],
+                                                     wasRemoteLogEnabled: 
Boolean,
+                                                     wasCopyDisabled: 
Boolean): Unit = {
     val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+    val isCopyDisabled = logs.exists(_.config.remoteCopyDisabled())
+    val isDeleteOnDisable = logs.exists(_.config.remoteLogDeleteOnDisable())
+
+    val (leaderPartitions, followerPartitions) =
+      logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+
     // Topic configs gets updated incrementally. This check is added to 
prevent redundant updates.
-    if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) {
-      val (leaderPartitions, followerPartitions) =
-        logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+    // When remote log is enabled, or remote copy is enabled, we should create 
RLM tasks accordingly via `onLeadershipChange`.
+    if (isRemoteLogEnabled && (!wasRemoteLogEnabled || (wasCopyDisabled && 
!isCopyDisabled))) {
       val topicIds = Collections.singletonMap(topic, 
replicaManager.metadataCache.getTopicId(topic))
       replicaManager.remoteLogManager.foreach(rlm =>
         rlm.onLeadershipChange(leaderPartitions.toSet.asJava, 
followerPartitions.toSet.asJava, topicIds))
-    } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) {
-      warn(s"Disabling remote log on the topic: $topic is not supported.")
+    }
+
+    // When copy disabled, we should stop leaderCopyRLMTask and 
followerRLMTask, but keep expirationTask
+    if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) {
+      replicaManager.remoteLogManager.foreach(rlm => {
+        rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava);
+        rlm.stopFollowerRLMTasks(followerPartitions.toSet.asJava)
+      })
+    }
+
+    // Disabling remote log storage on this topic
+    if (wasRemoteLogEnabled && !isRemoteLogEnabled && isDeleteOnDisable) {
+      val stopPartitions: java.util.HashSet[StopPartition] = new 
java.util.HashSet[StopPartition]()
+      leaderPartitions.foreach(partition => {
+        // delete remote logs and stop RemoteLogMetadataManager
+        stopPartitions.add(StopPartition(partition.topicPartition, 
deleteLocalLog = false,
+          deleteRemoteLog = true, stopRemoteLogMetadataManager = true))
+      })
+
+      followerPartitions.foreach(partition => {
+        // we need to cancel follower tasks and stop RemoteLogMetadataManager
+        stopPartitions.add(StopPartition(partition.topicPartition, 
deleteLocalLog = false,
+          deleteRemoteLog = false, stopRemoteLogMetadataManager = true))
+      })
+
+      // update the log start offset to local log start offset for the leader 
replicas
+      logs.filter(log => leaderPartitions.find(p => 
p.equals(log.topicPartition)).isDefined)

Review Comment:
   Nice! Updated!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to