shanzi opened a new issue, #16426:
URL: https://github.com/apache/iceberg/issues/16426

   ### Apache Iceberg version
   
   1.10.2
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   When a Flink task using the Iceberg Flink source is cancelled while the 
source fetcher is blocked waiting for an array-pool entry, the fetcher thread 
is not woken up.
   
   `IcebergSourceSplitReader#wakeUp()` is currently a no-op, while 
`ArrayPoolDataIteratorBatcher.ArrayPoolBatchIterator#getCachedEntry()` calls 
`Pool#pollEntry()`, which blocks indefinitely. Flink’s source fetcher shutdown 
path relies on `SplitReader#wakeUp()` to cooperatively unblock the fetcher. 
Because Iceberg does not unblock the current batch iterator, the fetcher can 
remain parked forever after cancellation.
   
   Over repeated Flink failovers, this leaks `Source Data Fetcher` threads, 
retained parquet buffers, S3 streams, and references into the operator chain 
until TaskManagers become GC-bound or die.
   
   #### Expected behavior
   
   Calling `IcebergSourceSplitReader#wakeUp()` during task cancellation should 
unblock any fetcher waiting for an array-pool entry, close the current reader, 
and allow the `Source Data Fetcher` thread to exit cleanly.
   
   #### Actual behavior
   
   The fetcher thread remains stuck indefinitely in:
   
   ```text
   "Source Data Fetcher ..." WAITING (parking)
     at java.util.concurrent.ArrayBlockingQueue.take
     at org.apache.flink.connector.file.src.util.Pool.pollEntry(Pool.java:82)
     at 
org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.getCachedEntry(...)
     at 
org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.next(...)
     at 
org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(...)
     at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(...)
     at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(...)
     at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(...)
   ```
   
   Flink then logs:
   
   ```text
   Failed to close the source reader in 30000 ms. There are still 1 split 
fetchers running
   ```
   
   After that timeout, cancellation proceeds, but the fetcher thread remains 
alive and retains memory/resources.
   
   #### Production evidence
   
   Observed in production with:
   
   ```text
   Flink: 1.19.3
   Iceberg: 1.10.1
   Java: 17
   Parallelism: 80
   TaskManagers: 20 x 4 slots
   Source pattern: multiple Iceberg source operators reading Parquet from 
S3-backed tables
   ```
   
   After repeated Flink failovers, one long-lived TaskManager had:
   
   ```text
   Heap used / max: 17,495 / 17,582 MB = 99.5%
   G1 old generation collections: 5,922 full GCs
   Source Data Fetcher threads: 250
   Expected Source Data Fetcher threads: <= 16
   ```
   
   Most leaked attempts had exactly 16 stuck fetcher threads, matching:
   
   ```text
   4 Iceberg sources x 4 task slots = 16 leaked fetcher threads per failover 
attempt
   ```
   
   The heap histogram showed retained source-reader/pool/parquet objects 
consistent with orphaned fetchers:
   
   ```text
   [B byte arrays                                      ~8.9 GB
   MemorySegment                                      ~6.9M instances
   Binary$ByteBufferBackedBinary                      ~10.7M instances
   BinaryStringData / BinarySection                   ~6.8M instances each
   PipelinedSubpartition                              20,000 instances
   LocalBufferPool$SubpartitionBufferRecycler         20,000 instances
   iceberg shaded parquet IntColumnChunkMetaData      ~20,000 instances
   ```
   
   #### Root cause
   
   `IcebergSourceSplitReader#wakeUp()` is empty:
   
   ```java
   @Override
   public void wakeUp() {}
   ```
   
   `ArrayPoolDataIteratorBatcher.ArrayPoolBatchIterator#getCachedEntry()` 
blocks indefinitely:
   
   ```java
   private T[] getCachedEntry() {
     try {
       return pool.pollEntry();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException("Interrupted while waiting for array pool 
entry", e);
     }
   }
   ```
   
   If cancellation happens after downstream has stopped recycling fetched 
batches, the pool can remain empty. Since `wakeUp()` does not update any state 
visible to `getCachedEntry()`, the fetcher cannot leave `Pool#pollEntry()`.
   
   #### Suggested fix
   
   Make the Iceberg Flink source reader wakeable while waiting for array-pool 
entries.
   
   One possible approach:
   
   1. Add a wakeup flag to the source reader and/or current batch iterator.
   2. Implement `IcebergSourceSplitReader#wakeUp()` to set the flag and 
propagate it to the current iterator.
   3. Replace the indefinite `pool.pollEntry()` wait with a bounded poll loop 
that periodically checks the wakeup flag.
   4. On wakeup, throw or otherwise return control so the closed `SplitFetcher` 
can exit cleanly.
   
   Conceptually:
   
   ```java
   while (!wakeUp) {
     T[] entry = pool.pollEntry(Duration.ofSeconds(10));
     if (entry != null) {
       return entry;
     }
   }
   throw new RuntimeException("Woken up while waiting for array pool entry");
   ```
   
   If the current Flink `Pool` API does not expose timed polling in the 
supported dependency version, Iceberg may need an equivalent wakeable wrapper 
or a bounded `tryPollEntry()` loop.
   
   A similar issue appears to have been fixed in Apache Paimon using a timed 
poll plus wakeup flag pattern:
   
   ```text
   https://www.mail-archive.com/[email protected]/msg03622.html
   ```
   
   #### Impact
   
   This is an unbounded per-JVM leak across Flink failovers. The original 
failover trigger does not need to be Iceberg-related. Once a cancellation 
happens while a fetcher is blocked in the array-pool wait, each later failover 
can leak approximately:
   
   ```text
   number_of_sources x task_slots_per_TaskManager
   ```
   
   fetcher threads per TaskManager attempt.
   
   The only reliable operational recovery we observed was restarting affected 
TaskManager JVMs.
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [x] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to