mxm commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2822676861
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutdownStartedThreadPools() {
+ long startTime = System.nanoTime();
Review Comment:
```suggestion
long startTimeMs = System.currentTimeMillis();
```
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutdownStartedThreadPools() {
+ long startTime = System.nanoTime();
+ ExecutorService item;
+ Queue<ExecutorService> invoked = new ArrayDeque<>();
Review Comment:
```suggestion
Queue<ExecutorService> pendingShutdown = new ArrayDeque<>();
```
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -56,6 +65,14 @@ private ThreadPools() {}
public static final int AUTH_REFRESH_THREAD_POOL_SIZE =
SystemConfigs.AUTH_REFRESH_THREAD_POOL_SIZE.value();
+ private static final int SHUTDOWN_TIMEOUT_SECONDS = 120;
Review Comment:
```suggestion
private static final Duration SHUTDOWN_TIMEOUT_SECONDS =
Duration.ofSeconds(120);
```
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutdownStartedThreadPools() {
+ long startTime = System.nanoTime();
+ ExecutorService item;
+ Queue<ExecutorService> invoked = new ArrayDeque<>();
+ while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+ item.shutdown();
+ invoked.add(item);
+ }
+ while ((item = invoked.poll()) != null) {
+ long timeElapsed = System.nanoTime() - startTime;
+ long remainingTime = SHUTDOWN_TIMEOUT_SECONDS * 1_000_000_000L -
timeElapsed;
+ if (remainingTime > 0) {
Review Comment:
```suggestion
if (System.currentTimeMillis() - startTimeMs >
SHUTDOWN_TIME_OUT.millis()) {
break;
}
```
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutdownStartedThreadPools() {
Review Comment:
```suggestion
public static void shutdownThreadPools() {
```
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutdownStartedThreadPools() {
Review Comment:
I think first thing we should remove the corresponding shutdown hook.
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutDownStartedThreadPools() {
Review Comment:
It works, but it is quite nondeterministic how the shutdown is executed.
With the current code, the shutdown timeout can be reset multiple times if
there are multiple calls to shutdownThreadPools().
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutdownStartedThreadPools() {
+ long startTime = System.nanoTime();
+ ExecutorService item;
+ Queue<ExecutorService> invoked = new ArrayDeque<>();
+ while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+ item.shutdown();
+ invoked.add(item);
+ }
+ while ((item = invoked.poll()) != null) {
+ long timeElapsed = System.nanoTime() - startTime;
+ long remainingTime = SHUTDOWN_TIMEOUT_SECONDS * 1_000_000_000L -
timeElapsed;
+ if (remainingTime > 0) {
+ try {
+ item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
Review Comment:
Should we also try this?
```suggestion
if (!item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS)) {
item.snapshotNow();
}
```
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,85 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutdownStartedThreadPools() {
+ long startTime = System.nanoTime();
Review Comment:
Probably sufficient to use System.currentTimeMillis() here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]