mxm commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2822676861


##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutdownStartedThreadPools() {
+    long startTime = System.nanoTime();

Review Comment:
   ```suggestion
       long startTimeMs = System.currentTimeMillis();
   ```



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutdownStartedThreadPools() {
+    long startTime = System.nanoTime();
+    ExecutorService item;
+    Queue<ExecutorService> invoked = new ArrayDeque<>();

Review Comment:
   ```suggestion
       Queue<ExecutorService> pendingShutdown = new ArrayDeque<>();
   ```



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -56,6 +65,14 @@ private ThreadPools() {}
   public static final int AUTH_REFRESH_THREAD_POOL_SIZE =
       SystemConfigs.AUTH_REFRESH_THREAD_POOL_SIZE.value();
 
+  private static final int SHUTDOWN_TIMEOUT_SECONDS = 120;

Review Comment:
   ```suggestion
     private static final Duration SHUTDOWN_TIMEOUT_SECONDS = 
Duration.ofSeconds(120);
   ```



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutdownStartedThreadPools() {
+    long startTime = System.nanoTime();
+    ExecutorService item;
+    Queue<ExecutorService> invoked = new ArrayDeque<>();
+    while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+      item.shutdown();
+      invoked.add(item);
+    }
+    while ((item = invoked.poll()) != null) {
+      long timeElapsed = System.nanoTime() - startTime;
+      long remainingTime = SHUTDOWN_TIMEOUT_SECONDS * 1_000_000_000L - 
timeElapsed;
+      if (remainingTime > 0) {

Review Comment:
   ```suggestion
         if (System.currentTimeMillis() - startTimeMs > 
SHUTDOWN_TIME_OUT.millis()) {
            break;
         }
   ```



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutdownStartedThreadPools() {

Review Comment:
   ```suggestion
     public static void shutdownThreadPools() {
   ```



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutdownStartedThreadPools() {

Review Comment:
   I think first thing we should remove the corresponding shutdown hook.



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutDownStartedThreadPools() {

Review Comment:
   It works, but it is quite nondeterministic how the shutdown is executed. 
With the current code, the shutdown timeout can be reset multiple times if 
there are multiple calls to shutdownThreadPools().



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutdownStartedThreadPools() {
+    long startTime = System.nanoTime();
+    ExecutorService item;
+    Queue<ExecutorService> invoked = new ArrayDeque<>();
+    while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+      item.shutdown();
+      invoked.add(item);
+    }
+    while ((item = invoked.poll()) != null) {
+      long timeElapsed = System.nanoTime() - startTime;
+      long remainingTime = SHUTDOWN_TIMEOUT_SECONDS * 1_000_000_000L - 
timeElapsed;
+      if (remainingTime > 0) {
+        try {
+          item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);

Review Comment:
   Should we also try this?
   ```suggestion
             if (!item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS)) {
               item.snapshotNow();
             }
   ```



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutdownStartedThreadPools() {
+    long startTime = System.nanoTime();

Review Comment:
   Probably sufficient to use System.currentTimeMillis() here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to