C0urante commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1469698231
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -620,6 +650,11 @@ private boolean startTask(
try (LoaderSwap loaderSwap =
plugins.withClassLoader(connectorLoader)) {
final ConnectorConfig connConfig = new
ConnectorConfig(plugins, connProps);
+
+ int maxTasks = connConfig.tasksMax();
+ int numTasks = configState.taskCount(id.connector());
+ checkTasksMax(id.connector(), numTasks, maxTasks,
connConfig.enforceTasksMax());
Review Comment:
This check handles the case where a connector has generated too many tasks
before the worker is upgraded to a version that includes the changes from this
PR. We want to catch these kinds of cases because we may develop features later
on (such as static assignment) that may be incompatible with too many tasks, so
we can't wait for the connector to have to generate new task configs to make
the check (and potentially allow an excessive number of tasks to keep running).
Regarding a stale config snapshot--great question, and it's taken me the
last half hour of reviewing `DistributedHerder` code to try to get to the
bottom of this, so please check my work.
I don't believe this should be a problem because the leader includes the
latest offset that it's read from the config topic during rebalances, and every
worker makes sure that they've read at least that far into the config topic
before starting any newly-assigned connectors or tasks. See
`handleRebalanceCompleted`
[here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1761-L1764)
for the worker checking its config snapshot offset against the leader's,
[here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1776-L1787)
for the read-to-end if the worker is lagging, and
[here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/Dist
ributedHerder.java#L1802) for where newly-assigned connectors and tasks are
started. For the reporting of the latest offset read from the config topic
during rebalance, see `WorkerCoordinator::metadata`
[here](https://github.com/apache/kafka/blob/94ab8c16bae9b6e3847f2ad61656ab834a0ddb9e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java#L182),
where a fresh snapshot is taken right before the worker (re)joins the group.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]