pvary commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2904839890


##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -180,15 +265,80 @@ public static ScheduledExecutorService 
newScheduledPool(String namePrefix, int p
    * is suitable for long-lived thread pools that should be automatically 
cleaned up on JVM
    * shutdown.
    */
-  public static ScheduledExecutorService newExitingScheduledPool(
+  public static synchronized ScheduledExecutorService newExitingScheduledPool(
       String namePrefix, int poolSize, Duration terminationTimeout) {
-    return MoreExecutors.getExitingScheduledExecutorService(
-        (ScheduledThreadPoolExecutor) newScheduledPool(namePrefix, poolSize),
-        terminationTimeout.toMillis(),
-        TimeUnit.MILLISECONDS);
+    ScheduledExecutorService service =
+        
Executors.unconfigurableScheduledExecutorService(newScheduledPool(namePrefix, 
poolSize));
+    THREAD_POOL_MANAGER.addThreadPool(service, terminationTimeout);
+    return service;
   }
 
   private static ThreadFactory newDaemonThreadFactory(String namePrefix) {
     return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix 
+ "-%d").build();
   }
+
+  /** Manages the lifecycle of thread pools that need to be shut down 
gracefully. */
+  @VisibleForTesting
+  static class ThreadPoolManager {
+    private final List<ExecutorServiceWithTimeout> threadPoolsToShutdown = 
Lists.newArrayList();
+
+    /**
+     * Add an executor service to the list of thread pools to be shut down.
+     *
+     * @param service the executor service to add
+     * @param timeout the timeout for shutdown operations
+     */
+    synchronized void addThreadPool(ExecutorService service, Duration timeout) 
{
+      threadPoolsToShutdown.add(new ExecutorServiceWithTimeout(service, 
timeout));
+    }
+
+    /** Shut down all registered thread pools. */
+    synchronized void shutdownAll() {
+      long startTime = System.nanoTime();
+      List<ExecutorServiceWithTimeout> pendingShutdown = Lists.newArrayList();
+
+      for (ExecutorServiceWithTimeout item : threadPoolsToShutdown) {
+        item.getService().shutdown();
+        pendingShutdown.add(item);
+      }
+
+      threadPoolsToShutdown.clear();
+
+      for (ExecutorServiceWithTimeout item : pendingShutdown) {
+        long timeElapsed = System.nanoTime() - startTime;
+        long remainingTime = item.getTimeout().toNanos() - timeElapsed;
+        if (remainingTime > 0) {
+
+          try {
+            if (!item.service.awaitTermination(remainingTime, 
TimeUnit.NANOSECONDS)) {
+              item.getService().shutdownNow();
+            }
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted while shutting down, ignoring", e);
+          }
+
+        } else {
+          item.getService().shutdownNow();
+        }
+      }
+    }
+  }
+
+  private static class ExecutorServiceWithTimeout {
+    private ExecutorService service;
+    private Duration timeout;
+
+    private ExecutorServiceWithTimeout(ExecutorService service, Duration 
timeout) {
+      this.service = service;
+      this.timeout = timeout;
+    }
+
+    private ExecutorService getService() {
+      return service;
+    }
+
+    private Duration getTimeout() {
+      return timeout;
+    }
+  }

Review Comment:
   ```suggestion
     private record ExecutorServiceWithTimeout(ExecutorService service, 
Duration timeout) {}
   ```
   
   Make this a record and adjust the accessor usage accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to