amogh-jahagirdar commented on code in PR #13310:
URL: https://github.com/apache/iceberg/pull/13310#discussion_r2193334597


##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -699,29 +708,41 @@ public DataWriter<InternalRow> createWriter(int 
partitionId, long taskId, long e
             writeSchema,
             dsSchema,
             targetFileSize,
-            useFanoutWriter);
+            useFanoutWriter,
+            extractRowLineage);
       }
     }
   }
 
   private static class UnpartitionedDataWriter implements 
DataWriter<InternalRow> {
     private final FileWriter<InternalRow, DataWriteResult> delegate;
     private final FileIO io;
+    private final Function<InternalRow, InternalRow> 
extractRowLineageFromMetadata;
 
     private UnpartitionedDataWriter(
         SparkFileWriterFactory writerFactory,
         OutputFileFactory fileFactory,
         FileIO io,
         PartitionSpec spec,
-        long targetFileSize) {
+        long targetFileSize,
+        Function<InternalRow, InternalRow> extractRowLineageFromMetadata) {
       this.delegate =
           new RollingDataWriter<>(writerFactory, fileFactory, io, 
targetFileSize, spec, null);
       this.io = io;
+      this.extractRowLineageFromMetadata = extractRowLineageFromMetadata;
     }
 
     @Override
     public void write(InternalRow record) throws IOException {
-      delegate.write(record);
+      write(null, record);
+    }
+
+    @Override
+    public void write(InternalRow meta, InternalRow record) throws IOException 
{
+      InternalRow rowLineage = extractRowLineageFromMetadata.apply(meta);
+      InternalRow recordWithLineage =
+          rowLineage == null ? record : new JoinedRow(record, rowLineage);

Review Comment:
   So yes, JoinedRow will put whatever metadata contents we extract at the end 
of the overall record which will be persisted. At the moment, we currently only 
extract and persist the lineage fields since that's what we care about but in 
the future if we had some other use case to persist new metadata columns, we 
could just change the extraction logic implementation to include the new 
metadata columns and it should just work.
   
    Let me know if that makes sense @stevenzwu 



##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -736,7 +769,14 @@ public void update(InternalRow meta, InternalRow id, 
InternalRow row) throws IOE
 
     @Override
     public void insert(InternalRow row) throws IOException {
-      delegate.insert(row, dataSpec, null);
+      reinsert(null, row);
+    }
+
+    @Override
+    public void reinsert(InternalRow meta, InternalRow row) throws IOException 
{

Review Comment:
   You're right I was also looking at how we can just project fields from a row 
without copying, and I saw this pattern too. I will incorporate this in my next 
update!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to