amogh-jahagirdar commented on code in PR #13310: URL: https://github.com/apache/iceberg/pull/13310#discussion_r2146309779
########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java: ########## @@ -116,16 +119,36 @@ public WriteBuilder overwrite(Filter[] filters) { @Override public Write build() { - // Validate - Schema writeSchema = validateOrMergeWriteSchema(table, dsSchema, writeConf); + // The write schema should only include row lineage in the output if it's an overwrite + // operation. + // In any other case, only null row IDs and sequence numbers would be produced which + // means the row lineage columns can be excluded from the output files + boolean writeIncludesRowLineage = TableUtil.supportsRowLineage(table) && overwriteFiles; + StructType sparkWriteSchema = dsSchema; + if (writeIncludesRowLineage) { + sparkWriteSchema = sparkWriteSchema.add("_row_id", LongType$.MODULE$); + sparkWriteSchema = + sparkWriteSchema.add( + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), LongType$.MODULE$); + } + Review Comment: Separate helper method `updateSparkWriteSchemaWithLineage` would make it clearer ########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java: ########## @@ -699,29 +706,83 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e writeSchema, dsSchema, targetFileSize, - useFanoutWriter); + useFanoutWriter, + persistLineageFields); } } } - private static class UnpartitionedDataWriter implements DataWriter<InternalRow> { + private abstract static class DataWriterWithLineage implements DataWriter<InternalRow> { + private Integer rowIdOrdinal = null; + private Integer lastUpdatedOrdinal = null; + + protected InternalRow lineageRow(InternalRow meta) { + GenericInternalRow row = new GenericInternalRow(2); + row.setNullAt(0); + row.setNullAt(1); + + if (meta == null) { + return row; + } + + // Ordinals are cached + if (rowIdOrdinal != null && lastUpdatedOrdinal != null) { + setIfNotNull(row, 0, meta, rowIdOrdinal); + setIfNotNull(row, 1, meta, lastUpdatedOrdinal); Review Comment: I'll ideally need to figure out a shared abstraction for both the MoR DeltaWriter impl and CoW so we can share the logic to derive the metadata values instead of duplicating the logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org