gharris1727 commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1449550586
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -620,6 +650,11 @@ private boolean startTask(
try (LoaderSwap loaderSwap =
plugins.withClassLoader(connectorLoader)) {
final ConnectorConfig connConfig = new
ConnectorConfig(plugins, connProps);
+
+ int maxTasks =
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
Review Comment:
nit: make a `tasksMax` getter?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -391,7 +391,15 @@ public List<Map<String, String>>
connectorTaskConfigs(String connName, Connector
Connector connector = workerConnector.connector();
Review Comment:
I didn't realize that this was the only place where Connector escapes the
WorkerConnector, and a different thread interacts with the Connector object.
I know the taskConfigs method is typically an instantaneous method, but
maybe it would make sense for this to eventually move to the WorkerConnector
thread instead of the herder tick thread.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -131,9 +133,26 @@ public void run() {
}
}
+ /**
+ * Fail the connector.
+ * @param cause the cause of the failure; if null, the connector will not
be failed
+ */
+ public void fail(Throwable cause) {
+ synchronized (this) {
+ if (this.externalFailure != null)
+ return;
+ this.externalFailure = cause;
+ notify();
+ }
+ }
+
void doRun() {
initialize();
while (!stopping) {
+ Throwable failure = externalFailure;
+ if (failure != null)
+ onFailure(failure);
Review Comment:
Is this in danger of being called more than once, particularly if the
connector has this problem and then a pause/resume request comes in? Is that a
bad thing?
It looks like the connector thread just waits in this loop until something
external calls shutdown(), so I would expect this to get called whenever
someone notify()'s the worker connector thread.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -131,9 +133,26 @@ public void run() {
}
}
+ /**
+ * Fail the connector.
+ * @param cause the cause of the failure; if null, the connector will not
be failed
+ */
+ public void fail(Throwable cause) {
+ synchronized (this) {
+ if (this.externalFailure != null)
+ return;
+ this.externalFailure = cause;
+ notify();
+ }
+ }
+
void doRun() {
initialize();
while (!stopping) {
+ Throwable failure = externalFailure;
+ if (failure != null)
Review Comment:
nit: curly braces
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -391,7 +391,15 @@ public List<Map<String, String>>
connectorTaskConfigs(String connName, Connector
Connector connector = workerConnector.connector();
try (LoaderSwap loaderSwap =
plugins.withClassLoader(workerConnector.loader())) {
String taskClassName = connector.taskClass().getName();
- for (Map<String, String> taskProps :
connector.taskConfigs(maxTasks)) {
+ List<Map<String, String>> taskConfigs =
connector.taskConfigs(maxTasks);
+ try {
+ checkTasksMax(connName, taskConfigs.size(), maxTasks,
connConfig.enforceTasksMax());
+ } catch (TooManyTasksException e) {
+ // TODO: This control flow is awkward. Push task config
generation into WorkerConnector class?
Review Comment:
This makes sense to me; checkTasksMax could be public static, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]