C0urante commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1608664359


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##########
@@ -187,6 +193,17 @@ public Map<String, String> rawTaskConfig(ConnectorTaskId 
task) {
         return taskConfigs.get(task);
     }
 
+    /**
+     * Get the hash of the connector config that was used to generate the
+     * latest set of task configs for the connector
+     * @param connectorName name of the connector
+     * @return the config hash, or null if the connector does not exist or
+     * no config hash for its latest set of tasks has been stored
+     */
+    public Integer taskConfigHash(String connectorName) {

Review Comment:
   True, but that would imply that it's the hash of the latest connector 
config, which is not always the case (and this feature is only value when it is 
not the case). I juggled several names including `tasksConnectorConfigHash` and 
`connectorConfigHash`, and tried to settle on something that was brief and 
least inaccurate.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java:
##########
@@ -243,4 +249,29 @@ public static Map<String, String> patchConfig(
         });
         return result;
     }
+
+    /**
+     * Generate a deterministic hash of the supplied config. For configurations
+     * with identical key-value pairs, this hash will always be the same.
+     * @param config the config to hash; may be null
+     * @return a hash of the config
+     */
+    public static int configHash(Map<String, String> config) {
+        if (config == null)
+            return 0;
+
+        Map<String, String> toHash = new TreeMap<>(config);
+
+        byte[] serialized;
+        try {
+            serialized = OBJECT_MAPPER.writeValueAsBytes(toHash);

Review Comment:
   I didn't want to rely on hash code implementations being identical across 
JVM restarts or even different JVMs. If there's an official spec somewhere that 
applies to all of our supported Java versions that guarantees that 
`AbstractMap::hashCode` or some other hash implementation will be the same 
everywhere, then I agree it'd be better to use that.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {
                 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
                 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
                     log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
                     result = true;
                 }
             }
+            // Do a final check to see if runtime-controlled properties that 
affect tasks but may
+            // not be included in the connector-generated configs for them 
(such as converter overrides)
+            // have changed
+            if (!result) {
+                Integer storedConnectorConfigHash = 
configState.taskConfigHash(connName);
+                if (storedConnectorConfigHash == null) {
+                    log.debug("Connector {} has no config hash stored for its 
existing tasks", connName);
+                } else if (storedConnectorConfigHash != connectorConfigHash) {
+                    log.debug(
+                            "Connector {} has change in config hash ({}) for 
tasks ({})",

Review Comment:
   I'd like to treat it as sensitive information and not log above `TRACE` 
level if possible (we allow record contents and other potentially-sensitive 
details to be logged at this level). I've added a note to a few Javadocs making 
this clear. Does this seem sufficient? If not, I can look into alternative hash 
functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to