C0urante commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1608663678


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {

Review Comment:
   Ah, wasn't thinking about the impact on logging. Yep, will revert.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {
                 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
                 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
                     log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
                     result = true;
                 }
             }
+            // Do a final check to see if runtime-controlled properties that 
affect tasks but may
+            // not be included in the connector-generated configs for them 
(such as converter overrides)
+            // have changed
+            if (!result) {

Review Comment:
   I was thinking it would be redundant in most of those cases since a 
connector config change would likely have already happened. By emitting this 
line less frequently, we guarantee that it indicates that a change in config 
hash was the sole cause of task restarts. Does that seem valuable?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {
                 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
                 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
                     log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
                     result = true;
                 }
             }
+            // Do a final check to see if runtime-controlled properties that 
affect tasks but may
+            // not be included in the connector-generated configs for them 
(such as converter overrides)
+            // have changed
+            if (!result) {
+                Integer storedConnectorConfigHash = 
configState.taskConfigHash(connName);
+                if (storedConnectorConfigHash == null) {
+                    log.debug("Connector {} has no config hash stored for its 
existing tasks", connName);

Review Comment:
   Yes, this is covered in the description:
   
   > If no hash has been written to the config topic yet, then we do not 
compare hashes. This is done in order to prevent upgrades to newer workers from 
causing all tasks on the cluster to be immediately restarted.
   
   I believe this is worth the tradeoff. Upgrades can be delicate and we should 
be careful about causing a flurry of task restarts during them. This bug only 
affects a small number of connectors and as soon as a new connector config is 
written on an upgraded cluster (which is likely to happen if people run into 
the bug in the first place), it will automatically be resolved.
   
   Also, with eager assignment, there's a potential for an infinite rebalance 
loop during upgrades if we apply the proposed change. Assume there is a cluster 
with at least two nodes, some non-leader worker `W`, and some connector `C`:
   - `W` is the first worker in the cluster to get upgraded
   - `W` receives its assignment, including connector `C`
   - `W` computes the hash for the latest config of `C`, sees that no existing 
hash has been stored for `C`, and forwards a request to the leader to write 
task configs for `C`
   - The leader writes the task configs, but since it has not been upgraded 
yet, does not include a config hash
   - Since new task configs have been written to the config topic, a rebalance 
is triggered
   - After the rebalance, `W` is again assigned a connector (it can be `C`, or 
a different connector)
   - `W` again sees that no hash is present in the config topic for that 
connector
   - The process then repeats



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to