karuppayya opened a new issue, #15988:
URL: https://github.com/apache/iceberg/issues/15988

   ### Apache Iceberg version
   
   1.10.1 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   `read.split.adaptive-size.enabled` (default `true`) computes 
[parallelism](https://github.com/apache/iceberg/blob/main/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java#L293)
 via `SparkReadConf.parallelism()`:
   ```
     public int parallelism() {
         int defaultParallelism = spark.sparkContext().defaultParallelism();
         int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
         return Math.max(defaultParallelism, numShufflePartitions);
     }
   ```
     **This is enabled by default and is a table-level property only** — there 
is no session-level config  to disable it globally. Users must ALTER TABLE 
every table **individually**.
   
   
   On a cluster (with say 2 executors/2 cores before upscaling), the 
parallelism becomes max(4, 200) = 200, creating many small tasks for modest 
scans. Spark's dynamic allocator requests new executors to service these tasks, 
but they finish on existing executors before new ones arrive,  particularly 
adverse for streaming micro-batches (500 MB–1 GB range), where the inflation 
repeats every batch.
   
   Example: `spark.sql.shuffle.partitions` = 200 (default):
   
   | Scan Size | Split Size | Tasks (actual) | Tasks (128 MB default) | 
Inflation |
   
|-----------|------------|----------------|-------------------------------|-----------|
   | 500 MB    | 16 MB      | 32             | 4                             | 
8×        |
   | 1 GB      | 16 MB      | 64             | 8                             | 
8×        |
   | 5 GB      | 25 MB      | 200            | 40                            | 
5×        |
   | 10 GB     | 50 MB      | 200            | 80                            | 
2.5×      |
   
   **Users commonly set shuffle partitions higher (400–2000) for shuffle-heavy 
workloads, relying on AQE coalescing to reduce actual shuffle parallelism at 
runtime. But Iceberg reads numShufflePartitions at scan-planning time before 
AQE runs and targets a parallelism the shuffle stage itself might not use.**
   
   
     ### Proposed fix
   
     - Session-level config — Add 
`spark.sql.iceberg.adaptive-split-size.enabled`  so users can disable adaptive 
sizing  globally without per-table ALTER TABLE.
     - Dedicated read parallelism config — Add 
`spark.sql.iceberg.read.split.parallelism` (defaulting to defaultParallelism) 
to give users explicit control over read-stage parallelism. This would make 
task count computation consistent and resilient to shuffle partition count 
changes( across Spark versions (e.g. Spark could change the 200 default) or 
from user-configured values for shuffle-heavy workloads).
   
   Related discussion 
[thread](https://github.com/apache/iceberg/pull/7714#discussion_r1275506085)
   cc: @aokolnychyi @rdblue @ConeyLiu Thoughts?
   
   ### Willingness to contribute
   
   - [x] I can contribute a fix for this bug independently
   - [ ] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to