DapengShi opened a new issue, #10987:
URL: https://github.com/apache/iceberg/issues/10987

   ### Apache Iceberg version
   
   main (development)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   we ran spark on k8s and this dead lock happened when we set spark exectuor 
cpu to 1
   here is the key thread dump of the executor 
   ```
   "Executor task launch worker for task 39128.1 in stage 1.0 (TID 52199)" 
#5788 daemon prio=5 os_prio=0 tid=0x00007f1964042800 nid=0x16d3 
sleeping[0x00007f190adfd000]
      java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at org.apache.iceberg.util.Tasks.waitFor(Tasks.java:518)
        at org.apache.iceberg.util.Tasks.access$800(Tasks.java:42)
        at org.apache.iceberg.util.Tasks$Builder.runParallel(Tasks.java:358)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:201)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
        at 
org.apache.iceberg.data.BaseDeleteLoader.execute(BaseDeleteLoader.java:236)
        at 
org.apache.iceberg.data.BaseDeleteLoader.loadPositionDeletes(BaseDeleteLoader.java:150)
        at 
org.apache.iceberg.data.DeleteFilter.deletedRowPositions(DeleteFilter.java:227)
        at 
org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.posDelRowIdMapping(ColumnarBatchReader.java:156)
        at 
org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.initRowIdMapping(ColumnarBatchReader.java:144)
        at 
org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.loadDataToColumnBatch(ColumnarBatchReader.java:96)
        at 
org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:72)
        at 
org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:44)
        at 
org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.next(VectorizedParquetReader.java:147)
        at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:138)
        at 
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
        at 
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1$$Lambda$862/982683868.apply(Unknown
 Source)
        at scala.Option.exists(Option.scala:376)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.executor.Executor$TaskRunner$$Lambda$508/1366295625.apply(Unknown
 Source)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
      Locked ownable synchronizers:
        - <0x00000004550025b8> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)
   ...
   
   "ForkJoinPool.commonPool-worker-0" #5789 daemon prio=5 os_prio=0 
tid=0x00007f18ac005800 nid=0x16d4 waiting for monitor entry [0x00007f190b9fe000]
      java.lang.Thread.State: BLOCKED (on object monitor)
        at 
java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1760)
        - waiting to lock <0x0000000546031ec8> (a 
java.util.concurrent.ConcurrentHashMap$Node)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.evictEntry(BoundedLocalCache.java:912)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.expireAfterAccessEntries(BoundedLocalCache.java:820)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.expireAfterAccessEntries(BoundedLocalCache.java:806)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.expireEntries(BoundedLocalCache.java:784)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.maintenance(BoundedLocalCache.java:1491)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.performCleanUp(BoundedLocalCache.java:1460)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache$PerformCleanupTask.run(BoundedLocalCache.java:3359)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache$PerformCleanupTask.exec(BoundedLocalCache.java:3346)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   
      Locked ownable synchronizers:
        - <0x0000000481f0eb08> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
   
   ...
   
   "iceberg-delete-worker-pool-3" #85 daemon prio=5 os_prio=0 
tid=0x00007f1974fd1800 nid=0x8e in Object.wait() [0x00007f1949dfd000]
      java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at 
java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334)
        - locked <0x0000000455002648> (a java.util.stream.ReduceOps$ReduceTask)
        at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405)
        at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
        at 
org.apache.hadoop.fs.statistics.impl.EvaluatingStatisticsMap.entrySet(EvaluatingStatisticsMap.java:166)
        - locked <0x00000004550026a0> (a 
org.apache.hadoop.fs.statistics.impl.EvaluatingStatisticsMap)
        at java.util.Collections$UnmodifiableMap.entrySet(Collections.java:1483)
        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.copyMap(IOStatisticsBinding.java:173)
        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.snapshotMap(IOStatisticsBinding.java:219)
        at 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.snapshotMap(IOStatisticsBinding.java:202)
        at 
org.apache.hadoop.fs.statistics.IOStatisticsSnapshot.snapshot(IOStatisticsSnapshot.java:163)
        - locked <0x0000000455002728> (a 
org.apache.hadoop.fs.statistics.IOStatisticsSnapshot)
        at 
org.apache.hadoop.fs.statistics.IOStatisticsSnapshot.<init>(IOStatisticsSnapshot.java:125)
        at 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics(IOStatisticsSupport.java:49)
        at 
org.apache.hadoop.fs.s3a.S3AInstrumentation$InputStreamStatistics.merge(S3AInstrumentation.java:1220)
        at 
org.apache.hadoop.fs.s3a.S3AInstrumentation$InputStreamStatistics.close(S3AInstrumentation.java:1126)
        at 
org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:620)
        - locked <0x0000000455002770> (a 
org.apache.hadoop.fs.s3a.S3AInputStream)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at 
org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingSeekableInputStream.close(DelegatingSeekableInputStream.java:50)
        at 
org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.close(ParquetFileReader.java:1447)
        at 
org.apache.iceberg.parquet.ParquetReader$FileIterator.close(ParquetReader.java:161)
        at org.apache.iceberg.io.CloseableGroup.close(CloseableGroup.java:80)
        at 
org.apache.iceberg.deletes.Deletes.toPositionIndexes(Deletes.java:154)
        at 
org.apache.iceberg.data.BaseDeleteLoader.readPosDeletes(BaseDeleteLoader.java:169)
        at 
org.apache.iceberg.data.BaseDeleteLoader.lambda$getOrReadPosDeletes$3(BaseDeleteLoader.java:160)
        at 
org.apache.iceberg.data.BaseDeleteLoader$$Lambda$1035/194701756.get(Unknown 
Source)
        at 
org.apache.iceberg.spark.SparkExecutorCache.lambda$loadFunc$0(SparkExecutorCache.java:122)
        at 
org.apache.iceberg.spark.SparkExecutorCache$$Lambda$1042/979932856.apply(Unknown
 Source)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.lambda$statsAware$0(LocalCache.java:139)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache$$Lambda$1046/1506524809.apply(Unknown
 Source)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2431)
        - locked <0x000000054a1eb350> (a 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.PSAMW)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache$$Lambda$867/1581302069.apply(Unknown
 Source)
        at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1877)
        - locked <0x0000000546031ec8> (a 
java.util.concurrent.ConcurrentHashMap$Node)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
        at 
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
        at 
org.apache.iceberg.spark.SparkExecutorCache.getOrLoad(SparkExecutorCache.java:114)
        at 
org.apache.iceberg.spark.source.BaseReader$SparkDeleteFilter$CachingDeleteLoader.getOrLoad(BaseReader.java:297)
        at 
org.apache.iceberg.data.BaseDeleteLoader.getOrReadPosDeletes(BaseDeleteLoader.java:160)
        at 
org.apache.iceberg.data.BaseDeleteLoader.lambda$loadPositionDeletes$2(BaseDeleteLoader.java:150)
        at 
org.apache.iceberg.data.BaseDeleteLoader$$Lambda$1032/2004566363.apply(Unknown 
Source)
        at 
org.apache.iceberg.data.BaseDeleteLoader.lambda$execute$7(BaseDeleteLoader.java:236)
        at 
org.apache.iceberg.data.BaseDeleteLoader$$Lambda$1034/1084935997.run(Unknown 
Source)
        at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
        at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:69)
        at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
      Locked ownable synchronizers:
        - <0x0000000481e9ec38> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)
   
   "iceberg-delete-worker-pool-2" #84 daemon prio=5 os_prio=0 
tid=0x00007f1974fcf800 nid=0x8d waiting on condition [0x00007f1949efe000]
      java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000481e84b48> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
      Locked ownable synchronizers:
        - None
   
   "iceberg-delete-worker-pool-1" #83 daemon prio=5 os_prio=0 
tid=0x00007f1974fce000 nid=0x8c waiting on condition [0x00007f1949ffe000]
      java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000481e84b48> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
      Locked ownable synchronizers:
        - None
   
   "iceberg-delete-worker-pool-0" #82 daemon prio=5 os_prio=0 
tid=0x00007f1974fcc000 nid=0x8b waiting on condition [0x00007f194a7fe000]
      java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000481e84b48> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
      Locked ownable synchronizers:
        - None
   ```
   
   As we can see, the task thread try to call 
`org.apache.iceberg.data.DeleteFilter.deletedRowPositions` and it waited all 
iceberg-delete-worker-pool threads to finish tasks. And there was only one task 
left in `iceberg-delete-worker-pool-3`, and the task called 
`BoundedLocalCache.computeIfAbsent`, as `ForkJoinPool.commonPool-worker-0` 
implied maybe BoundedLocalCache trigged one clean up task during this 
`computeIfAbsent`. And `BoundedLocalCache` uses ForkJoin common pool as default 
executor to execute this clean up task. The task was blocked by 
`ConcurrentHashMap`. So it waited `iceberg-delete-worker-pool-3` to finish. 
However in `BoundedLocalCache.computeIfAbsent` of 
`iceberg-delete-worker-pool-3`, the 
`org.apache.hadoop.fs.statistics.impl.EvaluatingStatisticsMap.entrySet` was 
called, and it uses `parallelStream()` to transform the set. `parallelStream()` 
will also calls ForkJoin common pool to execute task. Because the spark 
executor cpu was set to 1, so there would be only on thr
 ead in ForkJoin common pool, which was blocked by `BoundedLocalCache` clean up 
task and could not finish the `parallelStream()` task. Dead lock happend. 
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [ ] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to