gharris1727 commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1607071132
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>>
reverseTransform(String connName,
return result;
}
- public boolean taskConfigsChanged(ClusterConfigState configState, String
connName, List<Map<String, String>> taskProps) {
+ public boolean taskConfigsChanged(
+ ClusterConfigState configState,
+ String connName,
+ List<Map<String, String>> taskProps,
+ int connectorConfigHash
+ ) {
int currentNumTasks = configState.taskCount(connName);
boolean result = false;
if (taskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}",
connName, currentNumTasks, taskProps.size());
result = true;
} else {
- for (int index = 0; index < currentNumTasks; index++) {
+ for (int index = 0; index < currentNumTasks && !result; index++) {
Review Comment:
this has the effect of hiding the later debug logs for other tasks, is that
intentional? I don't know if anyone is relying on that information, but this is
taking away debug information that might be useful.
##########
checkstyle/import-control.xml:
##########
@@ -574,6 +574,7 @@
<!-- for annotations to avoid code duplication -->
<allow pkg="com.fasterxml.jackson.annotation" />
<allow pkg="com.fasterxml.jackson.databind" />
+ <allow pkg="com.fasterxml.jackson.databind" />
Review Comment:
nit: duplicate
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>>
reverseTransform(String connName,
return result;
}
- public boolean taskConfigsChanged(ClusterConfigState configState, String
connName, List<Map<String, String>> taskProps) {
+ public boolean taskConfigsChanged(
+ ClusterConfigState configState,
+ String connName,
+ List<Map<String, String>> taskProps,
+ int connectorConfigHash
+ ) {
int currentNumTasks = configState.taskCount(connName);
boolean result = false;
if (taskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}",
connName, currentNumTasks, taskProps.size());
result = true;
} else {
- for (int index = 0; index < currentNumTasks; index++) {
+ for (int index = 0; index < currentNumTasks && !result; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
log.debug("Connector {} has change in configuration for
task {}-{}", connName, connName, index);
result = true;
}
}
+ // Do a final check to see if runtime-controlled properties that
affect tasks but may
+ // not be included in the connector-generated configs for them
(such as converter overrides)
+ // have changed
+ if (!result) {
Review Comment:
None of this looks expensive to compute, WDYT about moving this outside the
`else` branch and always comparing the hash?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java:
##########
@@ -243,4 +249,29 @@ public static Map<String, String> patchConfig(
});
return result;
}
+
+ /**
+ * Generate a deterministic hash of the supplied config. For configurations
+ * with identical key-value pairs, this hash will always be the same.
+ * @param config the config to hash; may be null
+ * @return a hash of the config
+ */
+ public static int configHash(Map<String, String> config) {
+ if (config == null)
+ return 0;
+
+ Map<String, String> toHash = new TreeMap<>(config);
+
+ byte[] serialized;
+ try {
+ serialized = OBJECT_MAPPER.writeValueAsBytes(toHash);
Review Comment:
AbstractMap (superclass of TreeMap) has a hashCode implementation which
depends on the keys and values in the map. Did you consider using that method,
and reject it? It looks like it could have fewer memory allocations and copying.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##########
@@ -187,6 +193,17 @@ public Map<String, String> rawTaskConfig(ConnectorTaskId
task) {
return taskConfigs.get(task);
}
+ /**
+ * Get the hash of the connector config that was used to generate the
+ * latest set of task configs for the connector
+ * @param connectorName name of the connector
+ * @return the config hash, or null if the connector does not exist or
+ * no config hash for its latest set of tasks has been stored
+ */
+ public Integer taskConfigHash(String connectorName) {
Review Comment:
I think this should be called connectorConfigHash, since it's not specific
to any task.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>>
reverseTransform(String connName,
return result;
}
- public boolean taskConfigsChanged(ClusterConfigState configState, String
connName, List<Map<String, String>> taskProps) {
+ public boolean taskConfigsChanged(
+ ClusterConfigState configState,
+ String connName,
+ List<Map<String, String>> taskProps,
+ int connectorConfigHash
+ ) {
int currentNumTasks = configState.taskCount(connName);
boolean result = false;
if (taskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}",
connName, currentNumTasks, taskProps.size());
result = true;
} else {
- for (int index = 0; index < currentNumTasks; index++) {
+ for (int index = 0; index < currentNumTasks && !result; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
log.debug("Connector {} has change in configuration for
task {}-{}", connName, connName, index);
result = true;
}
}
+ // Do a final check to see if runtime-controlled properties that
affect tasks but may
+ // not be included in the connector-generated configs for them
(such as converter overrides)
+ // have changed
+ if (!result) {
+ Integer storedConnectorConfigHash =
configState.taskConfigHash(connName);
+ if (storedConnectorConfigHash == null) {
+ log.debug("Connector {} has no config hash stored for its
existing tasks", connName);
Review Comment:
This has the effect that the current behavior of not propagating
reconfigurations will continue after an upgrade, until a task reconfiguration
fires and passes one of the other conditions (task count change, obvious task
config change).
While this is "backwards compatible" i think it has the negative effect of
perpetuating the existing bad behavior even after the worker is upgraded, and
then at a seemingly arbitrary time later, irreversibly changing to the new
behavior. I think this should set result = true, to "prime the pump" and
eagerly upgrade connectors to have config hashes.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>>
reverseTransform(String connName,
return result;
}
- public boolean taskConfigsChanged(ClusterConfigState configState, String
connName, List<Map<String, String>> taskProps) {
+ public boolean taskConfigsChanged(
+ ClusterConfigState configState,
+ String connName,
+ List<Map<String, String>> taskProps,
+ int connectorConfigHash
+ ) {
int currentNumTasks = configState.taskCount(connName);
boolean result = false;
if (taskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}",
connName, currentNumTasks, taskProps.size());
result = true;
} else {
- for (int index = 0; index < currentNumTasks; index++) {
+ for (int index = 0; index < currentNumTasks && !result; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
log.debug("Connector {} has change in configuration for
task {}-{}", connName, connName, index);
result = true;
}
}
+ // Do a final check to see if runtime-controlled properties that
affect tasks but may
+ // not be included in the connector-generated configs for them
(such as converter overrides)
+ // have changed
+ if (!result) {
+ Integer storedConnectorConfigHash =
configState.taskConfigHash(connName);
+ if (storedConnectorConfigHash == null) {
+ log.debug("Connector {} has no config hash stored for its
existing tasks", connName);
+ } else if (storedConnectorConfigHash != connectorConfigHash) {
+ log.debug(
+ "Connector {} has change in config hash ({}) for
tasks ({})",
Review Comment:
The hash is computed from potentially sensitive data from ConfigProviders.
Since the ConfigProviders, configuring connectors, and reading from the config
topic are all privileged resources, the security of the hash is mostly
irrelevant. This log message prints the hash to the less-privileged logfile,
and that seems potentially dangerous
Assuming that an attacker has access to the logs, they can potentially see
all of the non-password configurations. With this log message, they would be
able to see the resulting hash, and they would know the precise hash algorithm
from reading the source. An attacker can guess-and-check the secrets by
inserting a test secret and running the hash algorithm.
This would give an attacker a weak filter, allowing them to guess-and-check
many more passwords offline than actually make it to the vulnerable service
(e.g. kafka).
I think we either need to treat the hash as sensitive information (keeping
it out of the logs, etc) or use a cryptographic hash instead that is expensive
to compute, and difficult to reverse.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]