gharris1727 commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1607071132


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {

Review Comment:
   this has the effect of hiding the later debug logs for other tasks, is that 
intentional? I don't know if anyone is relying on that information, but this is 
taking away debug information that might be useful.



##########
checkstyle/import-control.xml:
##########
@@ -574,6 +574,7 @@
       <!-- for annotations to avoid code duplication -->
       <allow pkg="com.fasterxml.jackson.annotation" />
       <allow pkg="com.fasterxml.jackson.databind" />
+      <allow pkg="com.fasterxml.jackson.databind" />

Review Comment:
   nit: duplicate



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {
                 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
                 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
                     log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
                     result = true;
                 }
             }
+            // Do a final check to see if runtime-controlled properties that 
affect tasks but may
+            // not be included in the connector-generated configs for them 
(such as converter overrides)
+            // have changed
+            if (!result) {

Review Comment:
   None of this looks expensive to compute, WDYT about moving this outside the 
`else` branch and always comparing the hash?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java:
##########
@@ -243,4 +249,29 @@ public static Map<String, String> patchConfig(
         });
         return result;
     }
+
+    /**
+     * Generate a deterministic hash of the supplied config. For configurations
+     * with identical key-value pairs, this hash will always be the same.
+     * @param config the config to hash; may be null
+     * @return a hash of the config
+     */
+    public static int configHash(Map<String, String> config) {
+        if (config == null)
+            return 0;
+
+        Map<String, String> toHash = new TreeMap<>(config);
+
+        byte[] serialized;
+        try {
+            serialized = OBJECT_MAPPER.writeValueAsBytes(toHash);

Review Comment:
   AbstractMap (superclass of TreeMap) has a hashCode implementation which 
depends on the keys and values in the map. Did you consider using that method, 
and reject it? It looks like it could have fewer memory allocations and copying.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##########
@@ -187,6 +193,17 @@ public Map<String, String> rawTaskConfig(ConnectorTaskId 
task) {
         return taskConfigs.get(task);
     }
 
+    /**
+     * Get the hash of the connector config that was used to generate the
+     * latest set of task configs for the connector
+     * @param connectorName name of the connector
+     * @return the config hash, or null if the connector does not exist or
+     * no config hash for its latest set of tasks has been stored
+     */
+    public Integer taskConfigHash(String connectorName) {

Review Comment:
   I think this should be called connectorConfigHash, since it's not specific 
to any task.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {
                 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
                 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
                     log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
                     result = true;
                 }
             }
+            // Do a final check to see if runtime-controlled properties that 
affect tasks but may
+            // not be included in the connector-generated configs for them 
(such as converter overrides)
+            // have changed
+            if (!result) {
+                Integer storedConnectorConfigHash = 
configState.taskConfigHash(connName);
+                if (storedConnectorConfigHash == null) {
+                    log.debug("Connector {} has no config hash stored for its 
existing tasks", connName);

Review Comment:
   This has the effect that the current behavior of not propagating 
reconfigurations will continue after an upgrade, until a task reconfiguration 
fires and passes one of the other conditions (task count change, obvious task 
config change).
   
   While this is "backwards compatible" i think it has the negative effect of 
perpetuating the existing bad behavior even after the worker is upgraded, and 
then at a seemingly arbitrary time later, irreversibly changing to the new 
behavior. I think this should set result = true, to "prime the pump" and 
eagerly upgrade connectors to have config hashes.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {
                 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
                 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
                     log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
                     result = true;
                 }
             }
+            // Do a final check to see if runtime-controlled properties that 
affect tasks but may
+            // not be included in the connector-generated configs for them 
(such as converter overrides)
+            // have changed
+            if (!result) {
+                Integer storedConnectorConfigHash = 
configState.taskConfigHash(connName);
+                if (storedConnectorConfigHash == null) {
+                    log.debug("Connector {} has no config hash stored for its 
existing tasks", connName);
+                } else if (storedConnectorConfigHash != connectorConfigHash) {
+                    log.debug(
+                            "Connector {} has change in config hash ({}) for 
tasks ({})",

Review Comment:
   The hash is computed from potentially sensitive data from ConfigProviders. 
Since the ConfigProviders, configuring connectors, and reading from the config 
topic are all privileged resources, the security of the hash is mostly 
irrelevant. This log message prints the hash to the less-privileged logfile, 
and that seems potentially dangerous
   
   Assuming that an attacker has access to the logs, they can potentially see 
all of the non-password configurations. With this log message, they would be 
able to see the resulting hash, and they would know the precise hash algorithm 
from reading the source. An attacker can guess-and-check the secrets by 
inserting a test secret and running the hash algorithm.
   
   This would give an attacker a weak filter, allowing them to guess-and-check 
many more passwords offline than actually make it to the vulnerable service 
(e.g. kafka).
   
   I think we either need to treat the hash as sensitive information (keeping 
it out of the logs, etc) or use a cryptographic hash instead that is expensive 
to compute, and difficult to reverse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to