cccs-jc opened a new issue, #10828: URL: https://github.com/apache/iceberg/issues/10828
### Feature Request / Improvement Prior to the introduction of the ColumnarToRow in Spark 3.0.0, columnar data was converted into Spark's internal rows using generated code that copies data from ColumnBatch vectors to an internal row. In Spark 3.0.0 and optimization was introduce which provides a row iterator which retrieves values directly from the ColumnBatch vectors thus avoiding a copy. This optimization is not used when reading from Iceberg tables, specifically when filters are applied to nested structures. When reading using a "spark table" then the optimization is applied. When using Iceberg the optimization is applied. However, it is not applied if you put a filter on a sub-field. The code below reproduces the issue. ``` from pyspark.sql import SparkSession import pyspark.sql.functions as F import time spark = (SparkSession .builder .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.local.warehouse", "file:///tmp/iceberg") .config("spark.sql.catalog.local.type", "hadoop") .getOrCreate() ) spark.sql("drop table if exists local.jcc.test") spark.sql(""" create table local.jcc.test ( id long, src_start_num long, source struct<sub1 string, start_num long> ) using iceberg """) # place the value of src_start_num inside a struct spark.sql(""" insert into table local.jcc.test select id, src_start_num, named_struct( 'sub1', uuid(), 'start_num', src_start_num ) as source from ( select id, floor(rand() * 100000000) as src_start_num from range(1, 1000000000) ) """) ``` ``` print("------ warm up ------") df = spark.read.parquet("file:///tmp/iceberg/jcc/test/data") df.count() df = spark.read.format("iceberg").load("local.jcc.test") df.count() print("--------- parquet root filter, explain plan contains a ColumnarToRow ----------") start = time.time() spark.read.parquet("file:///tmp/iceberg/jcc/test/data") df = df.select("id").where("src_start_num = 300000") df.write.format("noop").mode("append").save("/dev/null") end = time.time() print(f"took: {end-start} seconds") df.explain(mode="formatted") print("--------- parquet sub filter, explain plan contains a ColumnarToRow ----------") start = time.time() df = spark.read.parquet("file:///tmp/iceberg/jcc/test/data") df = df.select("id").where("source.start_num = 300000") df.write.format("noop").mode("append").save("/dev/null") end = time.time() print(f"took: {end-start} seconds") df.explain(mode="formatted") print("--------- iceberg root filter, explain plan contains a ColumnarToRow ----------") start = time.time() df = spark.read.format("iceberg").load("local.jcc.test") df = df.select("id").where("src_start_num = 300000") df.write.format("noop").mode("append").save("/dev/null") end = time.time() print(f"took: {end-start} seconds") df.explain(mode="formatted") print("--------- iceberg sub filter, explain plan DOES NOT contain ColumnarToRow ----------") start = time.time() df = spark.read.format("iceberg").load("local.jcc.test") df = df.select("id").where("source.start_num = 300000") df.write.format("noop").mode("append").save("/dev/null") end = time.time() print(f"took: {end-start} seconds") df.explain(mode="formatted") ``` ``` ------ warm up ------ --------- parquet root filter ---------- took: 22.319010496139526 seconds == Physical Plan == * Project (4) +- * Filter (3) +- * ColumnarToRow (2) +- Scan parquet (1) (1) Scan parquet Output [2]: [id#37L, src_start_num#38L] Batched: true Location: InMemoryFileIndex [file:/tmp/iceberg/jcc/test/data] PushedFilters: [IsNotNull(src_start_num), EqualTo(src_start_num,300000)] ReadSchema: struct<id:bigint,src_start_num:bigint> (2) ColumnarToRow [codegen id : 1] Input [2]: [id#37L, src_start_num#38L] (3) Filter [codegen id : 1] Input [2]: [id#37L, src_start_num#38L] Condition : (isnotnull(src_start_num#38L) AND (src_start_num#38L = 300000)) (4) Project [codegen id : 1] Output [1]: [id#37L] Input [2]: [id#37L, src_start_num#38L] --------- parquet sub filter ---------- took: 23.818222522735596 seconds == Physical Plan == * Project (4) +- * Filter (3) +- * ColumnarToRow (2) +- Scan parquet (1) (1) Scan parquet Output [2]: [id#46L, source#48] Batched: true Location: InMemoryFileIndex [file:/tmp/iceberg/jcc/test/data] PushedFilters: [IsNotNull(source.start_num), EqualTo(source.start_num,300000)] ReadSchema: struct<id:bigint,source:struct<start_num:bigint>> (2) ColumnarToRow [codegen id : 1] Input [2]: [id#46L, source#48] (3) Filter [codegen id : 1] Input [2]: [id#46L, source#48] Condition : (isnotnull(source#48.start_num) AND (source#48.start_num = 300000)) (4) Project [codegen id : 1] Output [1]: [id#46L] Input [2]: [id#46L, source#48] --------- iceberg root filter ---------- took: 24.27478837966919 seconds == Physical Plan == * Project (4) +- * Filter (3) +- * ColumnarToRow (2) +- BatchScan local.jcc.test (1) (1) BatchScan local.jcc.test Output [2]: [id#62L, src_start_num#63L] local.jcc.test (branch=null) [filters=src_start_num IS NOT NULL, src_start_num = 300000, groupedBy=] (2) ColumnarToRow [codegen id : 1] Input [2]: [id#62L, src_start_num#63L] (3) Filter [codegen id : 1] Input [2]: [id#62L, src_start_num#63L] Condition : (isnotnull(src_start_num#63L) AND (src_start_num#63L = 300000)) (4) Project [codegen id : 1] Output [1]: [id#62L] Input [2]: [id#62L, src_start_num#63L] --------- iceberg sub filter ---------- took: 75.11750912666321 seconds == Physical Plan == * Project (3) +- * Filter (2) +- BatchScan local.jcc.test (1) (1) BatchScan local.jcc.test Output [2]: [id#79L, source#81] local.jcc.test (branch=null) [filters=source.start_num IS NOT NULL, source.start_num = 300000, groupedBy=] (2) Filter [codegen id : 1] Input [2]: [id#79L, source#81] Condition : (isnotnull(source#81.start_num) AND (source#81.start_num = 300000)) (3) Project [codegen id : 1] Output [1]: [id#79L] Input [2]: [id#79L, source#81] ``` The query "iceberg sub filter" does not use the ColumnarToRow optimization and it takes much longer to execute. took: 75.11750912666321 seconds == Physical Plan == * Project (3) +- * Filter (2) +- BatchScan local.jcc.test (1) I'm using Spark 3.5.0 and Iceberg 1.5 ### Query engine Spark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org