aokolnychyi commented on code in PR #8872: URL: https://github.com/apache/iceberg/pull/8872#discussion_r1394989386
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java: ########## @@ -18,27 +18,76 @@ */ package org.apache.iceberg.spark.source; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -class SparkStagedScanBuilder implements ScanBuilder { +class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns { private final SparkSession spark; private final Table table; private final SparkReadConf readConf; + private final List<String> metaColumns = Lists.newArrayList(); Review Comment: Is it a deliberate choice to separate this variable from the block of other final variables above? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java: ########## @@ -18,27 +18,76 @@ */ package org.apache.iceberg.spark.source; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -class SparkStagedScanBuilder implements ScanBuilder { +class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns { private final SparkSession spark; private final Table table; private final SparkReadConf readConf; + private final List<String> metaColumns = Lists.newArrayList(); + + private Schema schema = null; + SparkStagedScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; this.readConf = new SparkReadConf(spark, table, options); + this.schema = table.schema(); } @Override public Scan build() { - return new SparkStagedScan(spark, table, readConf); + return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), readConf); + } + + @Override + public void pruneColumns(StructType requestedSchema) { + StructType requestedProjection = + new StructType( + Stream.of(requestedSchema.fields()) + .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) + .toArray(StructField[]::new)); + + // the projection should include all columns that will be returned, including those only used in Review Comment: This comment is redundant and misleading, there is no filter pushdown here. Let's remove it. ########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestMetaColumnProjectionWithStageScan extends SparkExtensionsTestBase { + + public TestMetaColumnProjectionWithStageScan( + String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties() + } + }; + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + private <T extends ScanTask> void stageTask( + Table tab, String fileSetID, CloseableIterable<T> tasks) { + ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); + taskSetManager.stageTasks(tab, fileSetID, Lists.newArrayList(tasks)); + } + + @Test + public void testReadStageTableMeta() throws Exception { + sql( + "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read')", + tableName); + + List<SimpleRecord> records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c"), + new SimpleRecord(4, "d")); + + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + table.refresh(); + String tableLocation = table.location(); + + try (CloseableIterable<ScanTask> tasks = table.newBatchScan().planFiles()) { + String fileSetID = UUID.randomUUID().toString(); + stageTask(table, fileSetID, tasks); + Dataset<Row> scanDF2 = + spark + .read() + .format("iceberg") + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) + .load(tableLocation); + + Assertions.assertThat(scanDF2.columns().length).isEqualTo(2); Review Comment: None of the tests validates the value of the metadata columns and it concerns me. Can we fix it? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java: ########## @@ -18,27 +18,76 @@ */ package org.apache.iceberg.spark.source; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -class SparkStagedScanBuilder implements ScanBuilder { +class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns { private final SparkSession spark; private final Table table; private final SparkReadConf readConf; + private final List<String> metaColumns = Lists.newArrayList(); + + private Schema schema = null; + SparkStagedScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; this.readConf = new SparkReadConf(spark, table, options); + this.schema = table.schema(); } @Override public Scan build() { - return new SparkStagedScan(spark, table, readConf); + return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), readConf); + } + + @Override + public void pruneColumns(StructType requestedSchema) { + StructType requestedProjection = + new StructType( + Stream.of(requestedSchema.fields()) + .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) + .toArray(StructField[]::new)); + + // the projection should include all columns that will be returned, including those only used in + // filters + this.schema = SparkSchemaUtil.prune(schema, requestedProjection); + + Stream.of(requestedSchema.fields()) + .map(StructField::name) + .filter(MetadataColumns::isMetadataColumn) + .distinct() + .forEach(metaColumns::add); + } + + private Schema schemaWithMetadataColumns() { + // metadata columns + List<Types.NestedField> fields = + metaColumns.stream() + .distinct() + .map(name -> MetadataColumns.metadataColumn(table, name)) + .collect(Collectors.toList()); + Schema meta = new Schema(fields); + + // schema or rows returned by readers Review Comment: Shall `or` be `of`? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java: ########## @@ -39,9 +40,8 @@ class SparkStagedScan extends SparkScan { private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of tasks - SparkStagedScan(SparkSession spark, Table table, SparkReadConf readConf) { - super(spark, table, readConf, table.schema(), ImmutableList.of(), null); - + SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, SparkReadConf readConf) { + super(spark, table, readConf, expectedSchema, ImmutableList.of(), null); Review Comment: I think you will need to add `readSchema()` to `equals` and `hashCode` now, like we do in other scans that support column pruning. See `SparkBatchQueryScan`. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java: ########## @@ -18,27 +18,76 @@ */ package org.apache.iceberg.spark.source; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -class SparkStagedScanBuilder implements ScanBuilder { +class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns { private final SparkSession spark; private final Table table; private final SparkReadConf readConf; + private final List<String> metaColumns = Lists.newArrayList(); + + private Schema schema = null; + SparkStagedScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; this.readConf = new SparkReadConf(spark, table, options); + this.schema = table.schema(); } @Override public Scan build() { - return new SparkStagedScan(spark, table, readConf); + return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), readConf); + } + + @Override + public void pruneColumns(StructType requestedSchema) { + StructType requestedProjection = Review Comment: What about a helper method like this? ``` @Override public void pruneColumns(StructType requestedSchema) { StructType requestedProjection = removeMetaColumns(requestedSchema); this.schema = SparkSchemaUtil.prune(schema, requestedProjection); ... } ``` ``` private StructType removeMetaColumns(StructType structType) { return new StructType( Stream.of(structType.fields()) .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) .toArray(StructField[]::new)); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org