stevenzwu commented on code in PR #13310: URL: https://github.com/apache/iceberg/pull/13310#discussion_r2191288421
########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/ExtractRowLineageFromMetadata.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.function.Function; +import org.apache.iceberg.MetadataColumns; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.ProjectingInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; + +class ExtractRowLineageFromMetadata implements Function<InternalRow, InternalRow> { + private Integer rowIdOrdinal; + private Integer lastUpdatedOrdinal; + + @Override + public InternalRow apply(InternalRow meta) { + GenericInternalRow row = new GenericInternalRow(2); Review Comment: can this object be reused? basically make this a wrapper class. ########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java: ########## @@ -699,29 +708,41 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e writeSchema, dsSchema, targetFileSize, - useFanoutWriter); + useFanoutWriter, + extractRowLineage); } } } private static class UnpartitionedDataWriter implements DataWriter<InternalRow> { private final FileWriter<InternalRow, DataWriteResult> delegate; private final FileIO io; + private final Function<InternalRow, InternalRow> extractRowLineageFromMetadata; private UnpartitionedDataWriter( SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, PartitionSpec spec, - long targetFileSize) { + long targetFileSize, + Function<InternalRow, InternalRow> extractRowLineageFromMetadata) { this.delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); this.io = io; + this.extractRowLineageFromMetadata = extractRowLineageFromMetadata; } @Override public void write(InternalRow record) throws IOException { - delegate.write(record); + write(null, record); + } + + @Override + public void write(InternalRow meta, InternalRow record) throws IOException { + InternalRow rowLineage = extractRowLineageFromMetadata.apply(meta); + InternalRow recordWithLineage = + rowLineage == null ? record : new JoinedRow(record, rowLineage); Review Comment: does `JoinedRow` mean that the row lineage metadata columns are always the last two fields in the write schema? Would this still work if there are new metadata columns added to the end of the schema in the future? ########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java: ########## @@ -116,16 +119,36 @@ public WriteBuilder overwrite(Filter[] filters) { @Override public Write build() { - // Validate - Schema writeSchema = validateOrMergeWriteSchema(table, dsSchema, writeConf); + // The write schema should only include row lineage in the output if it's an overwrite + // operation. + // In any other case, only null row IDs and sequence numbers would be produced which + // means the row lineage columns can be excluded from the output files + boolean writeIncludesRowLineage = TableUtil.supportsRowLineage(table) && overwriteFiles; + StructType sparkWriteSchema = dsSchema; + if (writeIncludesRowLineage) { + sparkWriteSchema = sparkWriteSchema.add("_row_id", LongType$.MODULE$); Review Comment: do you plan to use the constant here like the next line? ########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java: ########## @@ -20,17 +20,40 @@ import org.apache.spark.sql.connector.catalog.MetadataColumn; import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.MetadataBuilder; public class SparkMetadataColumn implements MetadataColumn { private final String name; private final DataType dataType; private final boolean isNullable; + private final boolean preserveOnReinsert; + private final boolean preserveOnUpdate; + private final boolean preserveOnDelete; public SparkMetadataColumn(String name, DataType dataType, boolean isNullable) { + this( + name, + dataType, + isNullable, + MetadataColumn.PRESERVE_ON_REINSERT_DEFAULT, + MetadataColumn.PRESERVE_ON_UPDATE_DEFAULT, + MetadataColumn.PRESERVE_ON_DELETE_DEFAULT); + } + + public SparkMetadataColumn( + String name, + DataType dataType, + boolean isNullable, Review Comment: nit: with 4 boolean args back to back. it is hard to see what each arg does at caller. should we switch to builder API ########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java: ########## @@ -699,29 +708,41 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e writeSchema, dsSchema, targetFileSize, - useFanoutWriter); + useFanoutWriter, + extractRowLineage); } } } private static class UnpartitionedDataWriter implements DataWriter<InternalRow> { private final FileWriter<InternalRow, DataWriteResult> delegate; private final FileIO io; + private final Function<InternalRow, InternalRow> extractRowLineageFromMetadata; private UnpartitionedDataWriter( SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, PartitionSpec spec, - long targetFileSize) { + long targetFileSize, + Function<InternalRow, InternalRow> extractRowLineageFromMetadata) { this.delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); this.io = io; + this.extractRowLineageFromMetadata = extractRowLineageFromMetadata; } @Override public void write(InternalRow record) throws IOException { - delegate.write(record); + write(null, record); Review Comment: nit: can we switch the order of the arg so that the 2nd arg is null / optioinal ########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java: ########## @@ -687,8 +690,14 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e .writeProperties(writeProperties) .build(); + Function<InternalRow, InternalRow> extractRowLineage = + writeSchema.findField(MetadataColumns.ROW_ID.fieldId()) != null Review Comment: there are two advantages of constructing the `ExtractRowLineageFromMetadata` with a `writeSchema`. * discover the rowId and lastUpdatedSeqNum metadata columns in the constructor * move this condition also inside -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org