shanzi opened a new issue, #16426:
URL: https://github.com/apache/iceberg/issues/16426
### Apache Iceberg version
1.10.2
### Query engine
Flink
### Please describe the bug 🐞
When a Flink task using the Iceberg Flink source is cancelled while the
source fetcher is blocked waiting for an array-pool entry, the fetcher thread
is not woken up.
`IcebergSourceSplitReader#wakeUp()` is currently a no-op, while
`ArrayPoolDataIteratorBatcher.ArrayPoolBatchIterator#getCachedEntry()` calls
`Pool#pollEntry()`, which blocks indefinitely. Flink’s source fetcher shutdown
path relies on `SplitReader#wakeUp()` to cooperatively unblock the fetcher.
Because Iceberg does not unblock the current batch iterator, the fetcher can
remain parked forever after cancellation.
Over repeated Flink failovers, this leaks `Source Data Fetcher` threads,
retained parquet buffers, S3 streams, and references into the operator chain
until TaskManagers become GC-bound or die.
#### Expected behavior
Calling `IcebergSourceSplitReader#wakeUp()` during task cancellation should
unblock any fetcher waiting for an array-pool entry, close the current reader,
and allow the `Source Data Fetcher` thread to exit cleanly.
#### Actual behavior
The fetcher thread remains stuck indefinitely in:
```text
"Source Data Fetcher ..." WAITING (parking)
at java.util.concurrent.ArrayBlockingQueue.take
at org.apache.flink.connector.file.src.util.Pool.pollEntry(Pool.java:82)
at
org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.getCachedEntry(...)
at
org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher$ArrayPoolBatchIterator.next(...)
at
org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(...)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(...)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(...)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(...)
```
Flink then logs:
```text
Failed to close the source reader in 30000 ms. There are still 1 split
fetchers running
```
After that timeout, cancellation proceeds, but the fetcher thread remains
alive and retains memory/resources.
#### Production evidence
Observed in production with:
```text
Flink: 1.19.3
Iceberg: 1.10.1
Java: 17
Parallelism: 80
TaskManagers: 20 x 4 slots
Source pattern: multiple Iceberg source operators reading Parquet from
S3-backed tables
```
After repeated Flink failovers, one long-lived TaskManager had:
```text
Heap used / max: 17,495 / 17,582 MB = 99.5%
G1 old generation collections: 5,922 full GCs
Source Data Fetcher threads: 250
Expected Source Data Fetcher threads: <= 16
```
Most leaked attempts had exactly 16 stuck fetcher threads, matching:
```text
4 Iceberg sources x 4 task slots = 16 leaked fetcher threads per failover
attempt
```
The heap histogram showed retained source-reader/pool/parquet objects
consistent with orphaned fetchers:
```text
[B byte arrays ~8.9 GB
MemorySegment ~6.9M instances
Binary$ByteBufferBackedBinary ~10.7M instances
BinaryStringData / BinarySection ~6.8M instances each
PipelinedSubpartition 20,000 instances
LocalBufferPool$SubpartitionBufferRecycler 20,000 instances
iceberg shaded parquet IntColumnChunkMetaData ~20,000 instances
```
#### Root cause
`IcebergSourceSplitReader#wakeUp()` is empty:
```java
@Override
public void wakeUp() {}
```
`ArrayPoolDataIteratorBatcher.ArrayPoolBatchIterator#getCachedEntry()`
blocks indefinitely:
```java
private T[] getCachedEntry() {
try {
return pool.pollEntry();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for array pool
entry", e);
}
}
```
If cancellation happens after downstream has stopped recycling fetched
batches, the pool can remain empty. Since `wakeUp()` does not update any state
visible to `getCachedEntry()`, the fetcher cannot leave `Pool#pollEntry()`.
#### Suggested fix
Make the Iceberg Flink source reader wakeable while waiting for array-pool
entries.
One possible approach:
1. Add a wakeup flag to the source reader and/or current batch iterator.
2. Implement `IcebergSourceSplitReader#wakeUp()` to set the flag and
propagate it to the current iterator.
3. Replace the indefinite `pool.pollEntry()` wait with a bounded poll loop
that periodically checks the wakeup flag.
4. On wakeup, throw or otherwise return control so the closed `SplitFetcher`
can exit cleanly.
Conceptually:
```java
while (!wakeUp) {
T[] entry = pool.pollEntry(Duration.ofSeconds(10));
if (entry != null) {
return entry;
}
}
throw new RuntimeException("Woken up while waiting for array pool entry");
```
If the current Flink `Pool` API does not expose timed polling in the
supported dependency version, Iceberg may need an equivalent wakeable wrapper
or a bounded `tryPollEntry()` loop.
A similar issue appears to have been fixed in Apache Paimon using a timed
poll plus wakeup flag pattern:
```text
https://www.mail-archive.com/[email protected]/msg03622.html
```
#### Impact
This is an unbounded per-JVM leak across Flink failovers. The original
failover trigger does not need to be Iceberg-related. Once a cancellation
happens while a fetcher is blocked in the array-pool wait, each later failover
can leak approximately:
```text
number_of_sources x task_slots_per_TaskManager
```
fetcher threads per TaskManager attempt.
The only reliable operational recovery we observed was restarting affected
TaskManager JVMs.
### Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [x] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]