sopel39 opened a new issue, #11768:
URL: https://github.com/apache/iceberg/issues/11768

   ### Apache Iceberg version
   
   1.7.1 (latest release)
   
   ### Query engine
   
   Trino
   
   ### Please describe the bug 🐞
   
   `ParallelIterable` implementation is really complicated and has subtle 
concurrency bugs.
   
   # Context #1
   It was observed that with high concurrency/high workload scenario cluster 
concurrency is reduced to 0 or 1 due to S3 `Timeout waiting for connection from 
pool` errors. Once that starts to happening, it will continue to go on 
effectively making cluster unusable.
   
   # Context #2
   
   `ManifestGroup#plan` will create `ManifestReader` per every 
`ParallelIterable.Task`. These readers will effectively hold onto S3 connection 
from the pool. When `ParallelIterable` queue is full, `Task` will be tabled for 
later use. The number of tasks is not bounded by worker pool size, but rather 
`X = num(ParallelIterable instances) * size(ParallelIterator#taskFutures)`. One 
can see that `X` can be significant with high number of concurrent queries.
   
   # Issue #1
   
   `ParallelIterable` is not batch based. This means it will produce read-ahead 
results even it downstream consumer doesn't have slots for them. This can lead 
to subtle concurrency issues. For instance consider two parallel iterables `P1, 
P2`. Let's assume single threader reader consumes 500 elements from `P1`, then 
`P2` then `P1` and so on (this could be splits for instance). If `P1` becomes 
full then it will no longer fetch more elements while holding of tasks (which 
in turn hold S3 connections). This will prevent fetching of tasks from `P2` 
from completion (because there are no "free" S3 slots).
   
   Consider scenario:
   `S3 connection pool size=1`
   `approximateMaxQueueSize=1`
   `workerPoolSize=1`
   
   P1: starts `TaskP1`
   P1: produces result, queue full, `TaskP1` put on hold (holds S3 connection)
   P2: starts `TaskP2`, `TaskP2` is blocked on S3 connection pool
   P1: result consumed, `TaskP1` is scheduled again
   P1: `TaskP1` waits for `workerPoolSize` to be free, but `TaskP2` is waiting 
for `TaskP1` to release connection
   DEADLOCK
   
   # Issue #2
   
   Active waiting. This one is a known one. However, if one looks at 
`ParallelIterable.ParallelIterator#checkTasks` there is:
   ```
           if (taskFutures[i] == null || taskFutures[i].isDone()) {
              continuation.ifPresent(yieldedTasks::addLast);
   ...
             taskFutures[i] = submitNextTask();
         }
   ```
   which means active waiting is actually happening though `workerPool` (e.g. 
task is started on worker pool just to check that queue is full and it should 
be put on hold).
   
   # Short term fix?
   
   Once `ParallelIterable.Task` is started it should continue until entire task 
is consumed. This will prevent putting limited resourcs on hold. `if 
(queue.size() >= approximateMaxQueueSize) {` check should only happen once per 
task before iterator is created.
   
   # Long term fix?
   
   Perhaps the code can be refactored to be more readable and streamlined?
   
   cc @findepi @raunaqmorarka
   
   
   
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [X] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to