cccs-jc opened a new issue, #10828:
URL: https://github.com/apache/iceberg/issues/10828

   ### Feature Request / Improvement
   
   
   Prior to the introduction of the ColumnarToRow in Spark 3.0.0, columnar data 
was converted into Spark's internal rows using generated code that copies data 
from ColumnBatch vectors to an internal row.
   
   In Spark 3.0.0 and optimization was introduce which provides a row iterator 
which retrieves values directly from the ColumnBatch vectors thus avoiding a 
copy.
   
   This optimization is not used when reading from Iceberg tables, specifically 
when filters are applied to nested structures. When reading using a "spark 
table" then the optimization is applied. When using Iceberg the optimization is 
applied. However, it is not applied if you put a filter on a sub-field.  The 
code below reproduces the issue.
   
   ```
   from pyspark.sql import SparkSession
   import pyspark.sql.functions as F
   import time
   
   spark = (SparkSession
       .builder
       .config("spark.sql.catalog.local", 
"org.apache.iceberg.spark.SparkCatalog")
       .config("spark.sql.catalog.local.warehouse", "file:///tmp/iceberg")
       .config("spark.sql.catalog.local.type", "hadoop")
       .getOrCreate()
       )
   
   spark.sql("drop table if exists local.jcc.test")
   
   spark.sql("""
   create table local.jcc.test (
       id long,
       src_start_num long,
       source struct<sub1 string, start_num long> )
   using iceberg
   """)
   
   # place the value of src_start_num inside a struct
   spark.sql("""
   insert into table local.jcc.test
   select
       id,
       src_start_num,
       named_struct(
           'sub1', uuid(),
           'start_num', src_start_num
       ) as source
   from (
       select
           id,
           floor(rand() * 100000000) as src_start_num
       from
           range(1, 1000000000)
   )
   """)
   ```
   
   
   ```
   print("------ warm up ------")
   df = spark.read.parquet("file:///tmp/iceberg/jcc/test/data")
   df.count()
   df = spark.read.format("iceberg").load("local.jcc.test")
   df.count()
   
   
   print("--------- parquet root filter, explain plan contains a ColumnarToRow 
----------")
   start = time.time()
   spark.read.parquet("file:///tmp/iceberg/jcc/test/data")
   df = df.select("id").where("src_start_num = 300000")
   df.write.format("noop").mode("append").save("/dev/null")
   end = time.time()
   print(f"took: {end-start} seconds")
   df.explain(mode="formatted")
   
   print("--------- parquet sub filter, explain plan contains a ColumnarToRow 
----------")
   start = time.time()
   df = spark.read.parquet("file:///tmp/iceberg/jcc/test/data")
   df = df.select("id").where("source.start_num = 300000")
   df.write.format("noop").mode("append").save("/dev/null")
   end = time.time()
   print(f"took: {end-start} seconds")
   df.explain(mode="formatted")
   
   print("--------- iceberg root filter, explain plan contains a ColumnarToRow 
----------")
   start = time.time()
   df = spark.read.format("iceberg").load("local.jcc.test")
   df = df.select("id").where("src_start_num = 300000")
   df.write.format("noop").mode("append").save("/dev/null")
   end = time.time()
   print(f"took: {end-start} seconds")
   df.explain(mode="formatted")
   
   print("--------- iceberg sub filter, explain plan DOES NOT contain 
ColumnarToRow ----------")
   start = time.time()
   df = spark.read.format("iceberg").load("local.jcc.test")
   df = df.select("id").where("source.start_num = 300000")
   df.write.format("noop").mode("append").save("/dev/null")
   end = time.time()
   print(f"took: {end-start} seconds")
   df.explain(mode="formatted")
   ```
   
   
   
   ```
   ------ warm up ------
   --------- parquet root filter ----------
   took: 22.319010496139526 seconds
   == Physical Plan ==
   * Project (4)
   +- * Filter (3)
      +- * ColumnarToRow (2)
         +- Scan parquet  (1)
   
   
   (1) Scan parquet 
   Output [2]: [id#37L, src_start_num#38L]
   Batched: true
   Location: InMemoryFileIndex [file:/tmp/iceberg/jcc/test/data]
   PushedFilters: [IsNotNull(src_start_num), EqualTo(src_start_num,300000)]
   ReadSchema: struct<id:bigint,src_start_num:bigint>
   
   (2) ColumnarToRow [codegen id : 1]
   Input [2]: [id#37L, src_start_num#38L]
   
   (3) Filter [codegen id : 1]
   Input [2]: [id#37L, src_start_num#38L]
   Condition : (isnotnull(src_start_num#38L) AND (src_start_num#38L = 300000))
   
   (4) Project [codegen id : 1]
   Output [1]: [id#37L]
   Input [2]: [id#37L, src_start_num#38L]
   
   
   --------- parquet sub filter ----------
   took: 23.818222522735596 seconds
   == Physical Plan ==
   * Project (4)
   +- * Filter (3)
      +- * ColumnarToRow (2)
         +- Scan parquet  (1)
   
   
   (1) Scan parquet 
   Output [2]: [id#46L, source#48]
   Batched: true
   Location: InMemoryFileIndex [file:/tmp/iceberg/jcc/test/data]
   PushedFilters: [IsNotNull(source.start_num), 
EqualTo(source.start_num,300000)]
   ReadSchema: struct<id:bigint,source:struct<start_num:bigint>>
   
   (2) ColumnarToRow [codegen id : 1]
   Input [2]: [id#46L, source#48]
   
   (3) Filter [codegen id : 1]
   Input [2]: [id#46L, source#48]
   Condition : (isnotnull(source#48.start_num) AND (source#48.start_num = 
300000))
   
   (4) Project [codegen id : 1]
   Output [1]: [id#46L]
   Input [2]: [id#46L, source#48]
   
   
   --------- iceberg root filter ----------
   took: 24.27478837966919 seconds
   == Physical Plan ==
   * Project (4)
   +- * Filter (3)
      +- * ColumnarToRow (2)
         +- BatchScan local.jcc.test (1)
   
   
   (1) BatchScan local.jcc.test
   Output [2]: [id#62L, src_start_num#63L]
   local.jcc.test (branch=null) [filters=src_start_num IS NOT NULL, 
src_start_num = 300000, groupedBy=]
   
   (2) ColumnarToRow [codegen id : 1]
   Input [2]: [id#62L, src_start_num#63L]
   
   (3) Filter [codegen id : 1]
   Input [2]: [id#62L, src_start_num#63L]
   Condition : (isnotnull(src_start_num#63L) AND (src_start_num#63L = 300000))
   
   (4) Project [codegen id : 1]
   Output [1]: [id#62L]
   Input [2]: [id#62L, src_start_num#63L]
   
   
   --------- iceberg sub filter ----------
   took: 75.11750912666321 seconds
   == Physical Plan ==
   * Project (3)
   +- * Filter (2)
      +- BatchScan local.jcc.test (1)
   
   
   (1) BatchScan local.jcc.test
   Output [2]: [id#79L, source#81]
   local.jcc.test (branch=null) [filters=source.start_num IS NOT NULL, 
source.start_num = 300000, groupedBy=]
   
   (2) Filter [codegen id : 1]
   Input [2]: [id#79L, source#81]
   Condition : (isnotnull(source#81.start_num) AND (source#81.start_num = 
300000))
   
   (3) Project [codegen id : 1]
   Output [1]: [id#79L]
   Input [2]: [id#79L, source#81]
   ```
   
   
   
   
   The query "iceberg sub filter" does not use the ColumnarToRow optimization 
and it takes much longer to execute.
   
   took: 75.11750912666321 seconds
   == Physical Plan ==
   * Project (3)
   +- * Filter (2)
      +- BatchScan local.jcc.test (1)
   
   
   I'm using Spark 3.5.0 and Iceberg 1.5
   
   
   ### Query engine
   
   Spark


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to