pvary commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2904839890
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -180,15 +265,80 @@ public static ScheduledExecutorService
newScheduledPool(String namePrefix, int p
* is suitable for long-lived thread pools that should be automatically
cleaned up on JVM
* shutdown.
*/
- public static ScheduledExecutorService newExitingScheduledPool(
+ public static synchronized ScheduledExecutorService newExitingScheduledPool(
String namePrefix, int poolSize, Duration terminationTimeout) {
- return MoreExecutors.getExitingScheduledExecutorService(
- (ScheduledThreadPoolExecutor) newScheduledPool(namePrefix, poolSize),
- terminationTimeout.toMillis(),
- TimeUnit.MILLISECONDS);
+ ScheduledExecutorService service =
+
Executors.unconfigurableScheduledExecutorService(newScheduledPool(namePrefix,
poolSize));
+ THREAD_POOL_MANAGER.addThreadPool(service, terminationTimeout);
+ return service;
}
private static ThreadFactory newDaemonThreadFactory(String namePrefix) {
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix
+ "-%d").build();
}
+
+ /** Manages the lifecycle of thread pools that need to be shut down
gracefully. */
+ @VisibleForTesting
+ static class ThreadPoolManager {
+ private final List<ExecutorServiceWithTimeout> threadPoolsToShutdown =
Lists.newArrayList();
+
+ /**
+ * Add an executor service to the list of thread pools to be shut down.
+ *
+ * @param service the executor service to add
+ * @param timeout the timeout for shutdown operations
+ */
+ synchronized void addThreadPool(ExecutorService service, Duration timeout)
{
+ threadPoolsToShutdown.add(new ExecutorServiceWithTimeout(service,
timeout));
+ }
+
+ /** Shut down all registered thread pools. */
+ synchronized void shutdownAll() {
+ long startTime = System.nanoTime();
+ List<ExecutorServiceWithTimeout> pendingShutdown = Lists.newArrayList();
+
+ for (ExecutorServiceWithTimeout item : threadPoolsToShutdown) {
+ item.getService().shutdown();
+ pendingShutdown.add(item);
+ }
+
+ threadPoolsToShutdown.clear();
+
+ for (ExecutorServiceWithTimeout item : pendingShutdown) {
+ long timeElapsed = System.nanoTime() - startTime;
+ long remainingTime = item.getTimeout().toNanos() - timeElapsed;
+ if (remainingTime > 0) {
+
+ try {
+ if (!item.service.awaitTermination(remainingTime,
TimeUnit.NANOSECONDS)) {
+ item.getService().shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while shutting down, ignoring", e);
+ }
+
+ } else {
+ item.getService().shutdownNow();
+ }
+ }
+ }
+ }
+
+ private static class ExecutorServiceWithTimeout {
+ private ExecutorService service;
+ private Duration timeout;
+
+ private ExecutorServiceWithTimeout(ExecutorService service, Duration
timeout) {
+ this.service = service;
+ this.timeout = timeout;
+ }
+
+ private ExecutorService getService() {
+ return service;
+ }
+
+ private Duration getTimeout() {
+ return timeout;
+ }
+ }
Review Comment:
```suggestion
private record ExecutorServiceWithTimeout(ExecutorService service,
Duration timeout) {}
```
Make this a record and adjust the accessor usage accordingly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]