C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1684858189
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -694,12 +701,29 @@ public void testTasksFailOnInabilityToFence() throws
Exception {
)).all().get();
}
- StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax);
-
log.info("Bringing up connector with fresh slate; fencing should not
be necessary");
connect.configureConnector(CONNECTOR_NAME, props);
- assertConnectorStarted(connectorStart);
- // Verify that the connector and its tasks have been able to start
successfully
+
+ // Hack: There is a small chance that our recent ACL updates for the
connector have
+ // not yet been propagated across the entire Kafka cluster, and that
our connector
+ // will fail on startup when it tries to list the end offsets of the
worker's offsets topic
+ // So, we implement some retry logic here to add a layer of resiliency
in that case
+ waitForCondition(
+ () -> {
+ ConnectorStateInfo status =
connect.connectorStatus(CONNECTOR_NAME);
+ if ("RUNNING".equals(status.connector().state())) {
+ return true;
+ } else if ("FAILED".equals(status.connector().state())) {
+ log.debug("Restarting failed connector {}",
CONNECTOR_NAME);
+ connect.restartConnector(CONNECTOR_NAME);
+ }
+ return false;
+ },
+ 30_000,
Review Comment:
They're the same value, but we're retrying for slightly different reasons;
delays in ACL propagation won't necessarily be caught during pre-flight
connector config validation.
I've pulled this out into a separate `ACL_PROPAGATION_TIMEOUT_MS` value;
LMKWYT.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]