C0urante commented on code in PR #15180:
URL: https://github.com/apache/kafka/pull/15180#discussion_r1450925859
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -884,6 +890,158 @@ private void assertTimeoutException(Runnable operation,
String expectedStageDesc
connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
}
+ /**
+ * Tests the logic around enforcement of the
+ * {@link
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
+ * property and how it can be toggled via the
+ * {@link
org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG
tasks.max.enforce}
+ * property, following the test plain laid out in
+ * <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan">KIP-1004</a>.
+ */
+ @Test
+ public void testTasksMaxEnforcement() throws Exception {
+ String configTopic = "tasks-max-enforcement-configs";
+ workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(
+ NUM_WORKERS,
+ "Initial group of workers did not start in time."
+ );
+
+ Map<String, String> connectorProps =
defaultSourceConnectorProps(TOPIC_NAME);
+ int maxTasks = 1;
+ connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
+ int numTasks = 2;
+ connectorProps.put(MonitorableSourceConnector.NUM_TASKS,
Integer.toString(numTasks));
+ connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+ // A connector that generates excessive tasks will be failed with an
expected error message
+ connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+ CONNECTOR_NAME,
+ 0,
+ "connector did not fail in time"
+ );
+
+ String expectedErrorSnippet = String.format(
+ "The connector %s has generated %d tasks, which is greater
than %d, "
+ + "the maximum number of tasks it is configured to
create. ",
+ CONNECTOR_NAME,
+ numTasks,
+ maxTasks
+ );
+ String errorMessage =
connect.connectorStatus(CONNECTOR_NAME).connector().trace();
+ assertThat(errorMessage, containsString(expectedErrorSnippet));
+
+ // Stop all workers in the cluster
+ connect.workers().forEach(connect::removeWorker);
+
+ // Publish a set of too many task configs to the config topic, to
simulate
Review Comment:
Okay, trying this again, it's still not looking so great. Even with a null
`WorkerConfigTransformer` and relying on the mutability of the `workerProps`
field, there's another hurdle in the way: a non-null
`ConfigBackingStore.UpdateListener` has to be supplied to the
`KafkaConfigBackingStore` instance, or it'll fail when trying to read to the
end of the config topic after a call to `putTaskConfigs`.
Implementing that interface, even with no-ops, would nearly double the
verbosity of this part of the testing code. We could add some `updateListener
!= null` checks to the `KafkaConfigBackingStore` class, but IMO it's not worth
it to remove those guardrails just to accommodate testing code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]