amogh-jahagirdar commented on code in PR #13310: URL: https://github.com/apache/iceberg/pull/13310#discussion_r2193334597
########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java: ########## @@ -699,29 +708,41 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e writeSchema, dsSchema, targetFileSize, - useFanoutWriter); + useFanoutWriter, + extractRowLineage); } } } private static class UnpartitionedDataWriter implements DataWriter<InternalRow> { private final FileWriter<InternalRow, DataWriteResult> delegate; private final FileIO io; + private final Function<InternalRow, InternalRow> extractRowLineageFromMetadata; private UnpartitionedDataWriter( SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, PartitionSpec spec, - long targetFileSize) { + long targetFileSize, + Function<InternalRow, InternalRow> extractRowLineageFromMetadata) { this.delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null); this.io = io; + this.extractRowLineageFromMetadata = extractRowLineageFromMetadata; } @Override public void write(InternalRow record) throws IOException { - delegate.write(record); + write(null, record); + } + + @Override + public void write(InternalRow meta, InternalRow record) throws IOException { + InternalRow rowLineage = extractRowLineageFromMetadata.apply(meta); + InternalRow recordWithLineage = + rowLineage == null ? record : new JoinedRow(record, rowLineage); Review Comment: So yes, JoinedRow will put whatever metadata contents we extract at the end of the overall record which will be persisted. At the moment, we currently only extract and persist the lineage fields since that's what we care about but in the future if we had some other use case to persist new metadata columns, we could just change the extraction logic implementation to include the new metadata columns and it should just work. Let me know if that makes sense @stevenzwu ########## spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java: ########## @@ -736,7 +769,14 @@ public void update(InternalRow meta, InternalRow id, InternalRow row) throws IOE @Override public void insert(InternalRow row) throws IOException { - delegate.insert(row, dataSpec, null); + reinsert(null, row); + } + + @Override + public void reinsert(InternalRow meta, InternalRow row) throws IOException { Review Comment: You're right I was also looking at how we can just project fields from a row without copying, and I saw this pattern too. I will incorporate this in my next update! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org