C0urante commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1608663678
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>>
reverseTransform(String connName,
return result;
}
- public boolean taskConfigsChanged(ClusterConfigState configState, String
connName, List<Map<String, String>> taskProps) {
+ public boolean taskConfigsChanged(
+ ClusterConfigState configState,
+ String connName,
+ List<Map<String, String>> taskProps,
+ int connectorConfigHash
+ ) {
int currentNumTasks = configState.taskCount(connName);
boolean result = false;
if (taskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}",
connName, currentNumTasks, taskProps.size());
result = true;
} else {
- for (int index = 0; index < currentNumTasks; index++) {
+ for (int index = 0; index < currentNumTasks && !result; index++) {
Review Comment:
Ah, wasn't thinking about the impact on logging. Yep, will revert.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>>
reverseTransform(String connName,
return result;
}
- public boolean taskConfigsChanged(ClusterConfigState configState, String
connName, List<Map<String, String>> taskProps) {
+ public boolean taskConfigsChanged(
+ ClusterConfigState configState,
+ String connName,
+ List<Map<String, String>> taskProps,
+ int connectorConfigHash
+ ) {
int currentNumTasks = configState.taskCount(connName);
boolean result = false;
if (taskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}",
connName, currentNumTasks, taskProps.size());
result = true;
} else {
- for (int index = 0; index < currentNumTasks; index++) {
+ for (int index = 0; index < currentNumTasks && !result; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
log.debug("Connector {} has change in configuration for
task {}-{}", connName, connName, index);
result = true;
}
}
+ // Do a final check to see if runtime-controlled properties that
affect tasks but may
+ // not be included in the connector-generated configs for them
(such as converter overrides)
+ // have changed
+ if (!result) {
Review Comment:
I was thinking it would be redundant in most of those cases since a
connector config change would likely have already happened. By emitting this
line less frequently, we guarantee that it indicates that a change in config
hash was the sole cause of task restarts. Does that seem valuable?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>>
reverseTransform(String connName,
return result;
}
- public boolean taskConfigsChanged(ClusterConfigState configState, String
connName, List<Map<String, String>> taskProps) {
+ public boolean taskConfigsChanged(
+ ClusterConfigState configState,
+ String connName,
+ List<Map<String, String>> taskProps,
+ int connectorConfigHash
+ ) {
int currentNumTasks = configState.taskCount(connName);
boolean result = false;
if (taskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}",
connName, currentNumTasks, taskProps.size());
result = true;
} else {
- for (int index = 0; index < currentNumTasks; index++) {
+ for (int index = 0; index < currentNumTasks && !result; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
log.debug("Connector {} has change in configuration for
task {}-{}", connName, connName, index);
result = true;
}
}
+ // Do a final check to see if runtime-controlled properties that
affect tasks but may
+ // not be included in the connector-generated configs for them
(such as converter overrides)
+ // have changed
+ if (!result) {
+ Integer storedConnectorConfigHash =
configState.taskConfigHash(connName);
+ if (storedConnectorConfigHash == null) {
+ log.debug("Connector {} has no config hash stored for its
existing tasks", connName);
Review Comment:
Yes, this is covered in the description:
> If no hash has been written to the config topic yet, then we do not
compare hashes. This is done in order to prevent upgrades to newer workers from
causing all tasks on the cluster to be immediately restarted.
I believe this is worth the tradeoff. Upgrades can be delicate and we should
be careful about causing a flurry of task restarts during them. This bug only
affects a small number of connectors and as soon as a new connector config is
written on an upgraded cluster (which is likely to happen if people run into
the bug in the first place), it will automatically be resolved.
Also, with eager assignment, there's a potential for an infinite rebalance
loop during upgrades if we apply the proposed change. Assume there is a cluster
with at least two nodes, some non-leader worker `W`, and some connector `C`:
- `W` is the first worker in the cluster to get upgraded
- `W` receives its assignment, including connector `C`
- `W` computes the hash for the latest config of `C`, sees that no existing
hash has been stored for `C`, and forwards a request to the leader to write
task configs for `C`
- The leader writes the task configs, but since it has not been upgraded
yet, does not include a config hash
- Since new task configs have been written to the config topic, a rebalance
is triggered
- After the rebalance, `W` is again assigned a connector (it can be `C`, or
a different connector)
- `W` again sees that no hash is present in the config topic for that
connector
- The process then repeats
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]