C0urante commented on code in PR #16451:
URL: https://github.com/apache/kafka/pull/16451#discussion_r1676172863
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java:
##########
@@ -47,6 +50,11 @@ public Connect(H herder, ConnectRestServer rest) {
shutdownHook = new ShutdownHook();
}
+ // public for testing
+ public Future<?> getDistributedHerderFuture() {
Review Comment:
Nit: We don't use the `get` prefix in this codebase. Also, this method can
technically be called for more than just distributed workers. Maybe we could
call it `herderTask()` and note in the Javadocs that it will return null if the
herder type doesn't have a separate work thread?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -232,6 +233,9 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
private final DistributedConfig config;
+ // visible for testing
+ public Future<?> herderFuture;
Review Comment:
This field should still be `private`, and only exposed via a getter:
```java
private Future<?> herderFuture;
...
public Future<?> herderFuture() {
return herderFuture;
}
```
Otherwise, it could be modified by other classes, which we almost certainly
do not want.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java:
##########
@@ -152,11 +156,20 @@ public void
testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I
"Body did not contain expected message detailing the
worker's in-progress operation: " + body
);
}
+
connect.resetRequestTimeout();
+ Future<?> future = worker.getDistributedHerderFuture();
+
+ try {
+ future.get(5000, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException | ExecutionException exception) {
+ log.error("Failed to start a worker:", exception);
+ future.cancel(true);
+ }
Review Comment:
Few things:
1. We don't need or want to cancel the future; resource cleanup should take
place when we shut down the cluster at the end of the test (see
[here](https://github.com/apache/kafka/blob/0ada8fac6869cad8ac33a79032cf5d57bfa2a3ea/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java#L61-L65))
2. We should fail the test on a `TimeoutException`, since it indicates that
the worker hasn't failed to startup yet
3. We don't need to log anything, especially not at `ERROR` level, if we get
an `ExecutionException`
4. We should probably go with a higher timeout; our CI can be really slow
sometimes and if we do something too low it might cause flakiness
```suggestion
assertThrows(
ExecutionException.class,
() -> future.get(1, TimeUnit.MINUTES)
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]