wzx140 opened a new issue, #11763:
URL: https://github.com/apache/iceberg/issues/11763

   ### Apache Iceberg version
   
   1.5.0
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   **Description**:
   In Spark’s optimization rule *PartitionPruning*, the method 
`SparkBatchQueryScan#filterAttributes` is called, which triggers the 
computation of `Set<PartitionSpec> specs`. During this process, it iterates 
over each file and parses the jsonString into `PartitionSpec`. To avoid 
repeated parsing, a cache map was added in 
`org.apache.iceberg.PartitionSpecParser#fromJson` with  `(schema, jsonStr) -> 
PartitionSpec`.
   
   However, when dealing with tables containing a large number of files and 
columns, **calculating the schema hash can consume significant CPU time**.
   
   **Proposed Solution**:
   Avro Schema mitigates this issue by **caching the schema’s hashCode** to 
avoid repeated computations. A similar optimization could be applied to 
Iceberg’s schema to reduce the performance regression caused by frequent schema 
hash calculations.
   
   **Reproduction Example**:
   I added a timer to the method 
`org.apache.iceberg.spark.source.SparkPartitioningAwareScan`
   
   ```java
     protected Set<PartitionSpec> specs() {
       if (specs == null) {
         long ts = System.currentTimeMillis();
         // avoid calling equals/hashCode on specs as those methods are 
relatively expensive
         IntStream specIds = tasks().stream().mapToInt(task -> 
task.spec().specId()).distinct();
         this.specs = specIds.mapToObj(id -> 
table().specs().get(id)).collect(Collectors.toSet());
         LOG.warn("Scanned {} specs in {} ms", specs.size(), 
System.currentTimeMillis() - ts);
       }
   
       return specs;
     }
   ```
   
   and tested the following SQL query on a table with 900,000 files and 1500+ 
columns:
   
   ```sql
   SELECT SUM(HASH(s.reqId + t.reqId)) 
   FROM table s
   JOIN table t
   ON s.reqId = t.reqId and s.partition = 'part1' and t.partition = 'part1'
   ```
   
   This query triggers 
`org.apache.spark.sql.execution.dynamicpruning.PartitionPruning` optimization 
rule twice. Before the task execution, **the driver spends approximately 150 
seconds on pre-execution preparation, with over 140 seconds consumed in 
calculating PartitionSpec**.
   
   **Flame Graph**:
   <img width="1279" alt="1" 
src="https://github.com/user-attachments/assets/636e891d-48b6-4fb2-9bf8-e444769082a4";
 />
   
   **Thread Dump**:
   ```
   java.base@17.0.9/java.util.Arrays.hashCode(Arrays.java:4499)
   java.base@17.0.9/java.util.Objects.hash(Objects.java:133)
   org.apache.iceberg.types.Types$NestedField.hashCode(Types.java:523)
   java.base@17.0.9/java.util.Arrays.hashCode(Arrays.java:4499)
   java.base@17.0.9/java.util.Objects.hash(Objects.java:133)
   org.apache.iceberg.types.Types$ListType.hashCode(Types.java:763)
   java.base@17.0.9/java.util.Arrays.hashCode(Arrays.java:4499)
   java.base@17.0.9/java.util.Objects.hash(Objects.java:133)
   org.apache.iceberg.types.Types$NestedField.hashCode(Types.java:523)
   java.base@17.0.9/java.util.Arrays.hashCode(Arrays.java:4499)
   org.apache.iceberg.types.Types$StructType.hashCode(Types.java:630)
   java.base@17.0.9/java.util.Arrays.hashCode(Arrays.java:4499)
   
org.apache.iceberg.relocated.com.google.common.base.Objects.hashCode(Objects.java:79)
   org.apache.iceberg.util.Pair.hashCode(Pair.java:117)
   
java.base@17.0.9/java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
   
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2370)
   
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
   
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
   org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:86)
   org.apache.iceberg.BaseContentScanTask.spec(BaseContentScanTask.java:71) => 
holding Monitor(org.apache.iceberg.BaseFileScanTask@520850087)
   org.apache.iceberg.BaseFileScanTask.spec(BaseFileScanTask.java:34)
   
org.apache.iceberg.spark.source.SparkPartitioningAwareScan.lambda$specs$1(SparkPartitioningAwareScan.java:165)
   
org.apache.iceberg.spark.source.SparkPartitioningAwareScan$$Lambda$3617/0x00007f58ed482c28.applyAsInt(Unknown
 Source)
   
java.base@17.0.9/java.util.stream.ReferencePipeline$4$1.accept(ReferencePipeline.java:214)
   
java.base@17.0.9/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
   
java.base@17.0.9/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
   
java.base@17.0.9/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
   
java.base@17.0.9/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
   
java.base@17.0.9/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
   
java.base@17.0.9/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
   
org.apache.iceberg.spark.source.SparkPartitioningAwareScan.specs(SparkPartitioningAwareScan.java:166)
   
org.apache.iceberg.spark.source.SparkBatchQueryScan.filterAttributes(SparkBatchQueryScan.java:103)
   
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.$anonfun$getFilterableTableScan$1(PartitionPruning.scala:82)
   
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$$Lambda$3616/0x00007f58ed48c3b0.apply(Unknown
 Source)
   app//scala.Option.flatMap(Option.scala:271)
   
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.getFilterableTableScan(PartitionPruning.scala:62)
   
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1(PartitionPruning.scala:258)
   
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1$adapted(PartitionPruning.scala:241)
   
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1$$Lambda$3615/0x00007f58ed48bfd0.apply(Unknown
 Source)
   app//scala.collection.immutable.List.foreach(List.scala:431)
   
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:241)
   
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:219)
   ```
   
   **Environment**:
   I tested this issue on Iceberg 1.5.0, and it is expected to persist in the 
latest version as well.
   
   ### Willingness to contribute
   
   - [X] I can contribute a fix for this bug independently
   - [X] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to