DapengShi opened a new issue, #10987: URL: https://github.com/apache/iceberg/issues/10987
### Apache Iceberg version main (development) ### Query engine Spark ### Please describe the bug 🐞 we ran spark on k8s and this dead lock happened when we set spark exectuor cpu to 1 here is the key thread dump of the executor ``` "Executor task launch worker for task 39128.1 in stage 1.0 (TID 52199)" #5788 daemon prio=5 os_prio=0 tid=0x00007f1964042800 nid=0x16d3 sleeping[0x00007f190adfd000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.iceberg.util.Tasks.waitFor(Tasks.java:518) at org.apache.iceberg.util.Tasks.access$800(Tasks.java:42) at org.apache.iceberg.util.Tasks$Builder.runParallel(Tasks.java:358) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:201) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at org.apache.iceberg.data.BaseDeleteLoader.execute(BaseDeleteLoader.java:236) at org.apache.iceberg.data.BaseDeleteLoader.loadPositionDeletes(BaseDeleteLoader.java:150) at org.apache.iceberg.data.DeleteFilter.deletedRowPositions(DeleteFilter.java:227) at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.posDelRowIdMapping(ColumnarBatchReader.java:156) at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.initRowIdMapping(ColumnarBatchReader.java:144) at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.loadDataToColumnBatch(ColumnarBatchReader.java:96) at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:72) at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:44) at org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.next(VectorizedParquetReader.java:147) at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:138) at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120) at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1$$Lambda$862/982683868.apply(Unknown Source) at scala.Option.exists(Option.scala:376) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.executor.Executor$TaskRunner$$Lambda$508/1366295625.apply(Unknown Source) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Locked ownable synchronizers: - <0x00000004550025b8> (a java.util.concurrent.ThreadPoolExecutor$Worker) ... "ForkJoinPool.commonPool-worker-0" #5789 daemon prio=5 os_prio=0 tid=0x00007f18ac005800 nid=0x16d4 waiting for monitor entry [0x00007f190b9fe000] java.lang.Thread.State: BLOCKED (on object monitor) at java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1760) - waiting to lock <0x0000000546031ec8> (a java.util.concurrent.ConcurrentHashMap$Node) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.evictEntry(BoundedLocalCache.java:912) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.expireAfterAccessEntries(BoundedLocalCache.java:820) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.expireAfterAccessEntries(BoundedLocalCache.java:806) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.expireEntries(BoundedLocalCache.java:784) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.maintenance(BoundedLocalCache.java:1491) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.performCleanUp(BoundedLocalCache.java:1460) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache$PerformCleanupTask.run(BoundedLocalCache.java:3359) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache$PerformCleanupTask.exec(BoundedLocalCache.java:3346) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Locked ownable synchronizers: - <0x0000000481f0eb08> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) ... "iceberg-delete-worker-pool-3" #85 daemon prio=5 os_prio=0 tid=0x00007f1974fd1800 nid=0x8e in Object.wait() [0x00007f1949dfd000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334) - locked <0x0000000455002648> (a java.util.stream.ReduceOps$ReduceTask) at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.hadoop.fs.statistics.impl.EvaluatingStatisticsMap.entrySet(EvaluatingStatisticsMap.java:166) - locked <0x00000004550026a0> (a org.apache.hadoop.fs.statistics.impl.EvaluatingStatisticsMap) at java.util.Collections$UnmodifiableMap.entrySet(Collections.java:1483) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.copyMap(IOStatisticsBinding.java:173) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.snapshotMap(IOStatisticsBinding.java:219) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.snapshotMap(IOStatisticsBinding.java:202) at org.apache.hadoop.fs.statistics.IOStatisticsSnapshot.snapshot(IOStatisticsSnapshot.java:163) - locked <0x0000000455002728> (a org.apache.hadoop.fs.statistics.IOStatisticsSnapshot) at org.apache.hadoop.fs.statistics.IOStatisticsSnapshot.<init>(IOStatisticsSnapshot.java:125) at org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics(IOStatisticsSupport.java:49) at org.apache.hadoop.fs.s3a.S3AInstrumentation$InputStreamStatistics.merge(S3AInstrumentation.java:1220) at org.apache.hadoop.fs.s3a.S3AInstrumentation$InputStreamStatistics.close(S3AInstrumentation.java:1126) at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:620) - locked <0x0000000455002770> (a org.apache.hadoop.fs.s3a.S3AInputStream) at java.io.FilterInputStream.close(FilterInputStream.java:181) at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingSeekableInputStream.close(DelegatingSeekableInputStream.java:50) at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.close(ParquetFileReader.java:1447) at org.apache.iceberg.parquet.ParquetReader$FileIterator.close(ParquetReader.java:161) at org.apache.iceberg.io.CloseableGroup.close(CloseableGroup.java:80) at org.apache.iceberg.deletes.Deletes.toPositionIndexes(Deletes.java:154) at org.apache.iceberg.data.BaseDeleteLoader.readPosDeletes(BaseDeleteLoader.java:169) at org.apache.iceberg.data.BaseDeleteLoader.lambda$getOrReadPosDeletes$3(BaseDeleteLoader.java:160) at org.apache.iceberg.data.BaseDeleteLoader$$Lambda$1035/194701756.get(Unknown Source) at org.apache.iceberg.spark.SparkExecutorCache.lambda$loadFunc$0(SparkExecutorCache.java:122) at org.apache.iceberg.spark.SparkExecutorCache$$Lambda$1042/979932856.apply(Unknown Source) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.lambda$statsAware$0(LocalCache.java:139) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache$$Lambda$1046/1506524809.apply(Unknown Source) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2431) - locked <0x000000054a1eb350> (a org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.PSAMW) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache$$Lambda$867/1581302069.apply(Unknown Source) at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1877) - locked <0x0000000546031ec8> (a java.util.concurrent.ConcurrentHashMap$Node) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62) at org.apache.iceberg.spark.SparkExecutorCache.getOrLoad(SparkExecutorCache.java:114) at org.apache.iceberg.spark.source.BaseReader$SparkDeleteFilter$CachingDeleteLoader.getOrLoad(BaseReader.java:297) at org.apache.iceberg.data.BaseDeleteLoader.getOrReadPosDeletes(BaseDeleteLoader.java:160) at org.apache.iceberg.data.BaseDeleteLoader.lambda$loadPositionDeletes$2(BaseDeleteLoader.java:150) at org.apache.iceberg.data.BaseDeleteLoader$$Lambda$1032/2004566363.apply(Unknown Source) at org.apache.iceberg.data.BaseDeleteLoader.lambda$execute$7(BaseDeleteLoader.java:236) at org.apache.iceberg.data.BaseDeleteLoader$$Lambda$1034/1084935997.run(Unknown Source) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:69) at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Locked ownable synchronizers: - <0x0000000481e9ec38> (a java.util.concurrent.ThreadPoolExecutor$Worker) "iceberg-delete-worker-pool-2" #84 daemon prio=5 os_prio=0 tid=0x00007f1974fcf800 nid=0x8d waiting on condition [0x00007f1949efe000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000481e84b48> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Locked ownable synchronizers: - None "iceberg-delete-worker-pool-1" #83 daemon prio=5 os_prio=0 tid=0x00007f1974fce000 nid=0x8c waiting on condition [0x00007f1949ffe000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000481e84b48> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Locked ownable synchronizers: - None "iceberg-delete-worker-pool-0" #82 daemon prio=5 os_prio=0 tid=0x00007f1974fcc000 nid=0x8b waiting on condition [0x00007f194a7fe000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000481e84b48> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Locked ownable synchronizers: - None ``` As we can see, the task thread try to call `org.apache.iceberg.data.DeleteFilter.deletedRowPositions` and it waited all iceberg-delete-worker-pool threads to finish tasks. And there was only one task left in `iceberg-delete-worker-pool-3`, and the task called `BoundedLocalCache.computeIfAbsent`, as `ForkJoinPool.commonPool-worker-0` implied maybe BoundedLocalCache trigged one clean up task during this `computeIfAbsent`. And `BoundedLocalCache` uses ForkJoin common pool as default executor to execute this clean up task. The task was blocked by `ConcurrentHashMap`. So it waited `iceberg-delete-worker-pool-3` to finish. However in `BoundedLocalCache.computeIfAbsent` of `iceberg-delete-worker-pool-3`, the `org.apache.hadoop.fs.statistics.impl.EvaluatingStatisticsMap.entrySet` was called, and it uses `parallelStream()` to transform the set. `parallelStream()` will also calls ForkJoin common pool to execute task. Because the spark executor cpu was set to 1, so there would be only on thr ead in ForkJoin common pool, which was blocked by `BoundedLocalCache` clean up task and could not finish the `parallelStream()` task. Dead lock happend. ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org