C0urante commented on code in PR #16451:
URL: https://github.com/apache/kafka/pull/16451#discussion_r1676172863


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java:
##########
@@ -47,6 +50,11 @@ public Connect(H herder, ConnectRestServer rest) {
         shutdownHook = new ShutdownHook();
     }
 
+    // public for testing
+    public Future<?> getDistributedHerderFuture() {

Review Comment:
   Nit: We don't use the `get` prefix in this codebase. Also, this method can 
technically be called for more than just distributed workers. Maybe we could 
call it `herderTask()` and note in the Javadocs that it will return null if the 
herder type doesn't have a separate work thread?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -232,6 +233,9 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
     private final DistributedConfig config;
 
+    // visible for testing
+    public Future<?> herderFuture;

Review Comment:
   This field should still be `private`, and only exposed via a getter:
   ```java
   private Future<?> herderFuture;
   
   ...
   
   public Future<?> herderFuture() {
       return herderFuture;
   }
   ```
   
   Otherwise, it could be modified by other classes, which we almost certainly 
do not want.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java:
##########
@@ -152,11 +156,20 @@ public void 
testFailToCreateInternalTopicsWithMoreReplicasThanBrokers() throws I
                     "Body did not contain expected message detailing the 
worker's in-progress operation: " + body
             );
         }
+
         connect.resetRequestTimeout();
 
+        Future<?> future = worker.getDistributedHerderFuture();
+
+        try {
+            future.get(5000, TimeUnit.MILLISECONDS);
+        }  catch (TimeoutException | ExecutionException exception) {
+            log.error("Failed to start a worker:", exception);
+            future.cancel(true);
+        }

Review Comment:
   Few things:
   1. We don't need or want to cancel the future; resource cleanup should take 
place when we shut down the cluster at the end of the test (see 
[here](https://github.com/apache/kafka/blob/0ada8fac6869cad8ac33a79032cf5d57bfa2a3ea/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java#L61-L65))
   2. We should fail the test on a `TimeoutException`, since it indicates that 
the worker hasn't failed to startup yet
   3. We don't need to log anything, especially not at `ERROR` level, if we get 
an `ExecutionException`
   4. We should probably go with a higher timeout; our CI can be really slow 
sometimes and if we do something too low it might cause flakiness
   
   ```suggestion
           assertThrows(
               ExecutionException.class,
               () -> future.get(1, TimeUnit.MINUTES)
           );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to