amogh-jahagirdar commented on code in PR #13310:
URL: https://github.com/apache/iceberg/pull/13310#discussion_r2146309779


##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java:
##########
@@ -116,16 +119,36 @@ public WriteBuilder overwrite(Filter[] filters) {
 
   @Override
   public Write build() {
-    // Validate
-    Schema writeSchema = validateOrMergeWriteSchema(table, dsSchema, 
writeConf);
+    // The write schema should only include row lineage in the output if it's 
an overwrite
+    // operation.
+    // In any other case, only null row IDs and sequence numbers would be 
produced which
+    // means the row lineage columns can be excluded from the output files
+    boolean writeIncludesRowLineage = TableUtil.supportsRowLineage(table) && 
overwriteFiles;
+    StructType sparkWriteSchema = dsSchema;
+    if (writeIncludesRowLineage) {
+      sparkWriteSchema = sparkWriteSchema.add("_row_id", LongType$.MODULE$);
+      sparkWriteSchema =
+          sparkWriteSchema.add(
+              MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), 
LongType$.MODULE$);
+    }
+

Review Comment:
   Separate helper method `updateSparkWriteSchemaWithLineage` would make it 
clearer



##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -699,29 +706,83 @@ public DataWriter<InternalRow> createWriter(int 
partitionId, long taskId, long e
             writeSchema,
             dsSchema,
             targetFileSize,
-            useFanoutWriter);
+            useFanoutWriter,
+            persistLineageFields);
       }
     }
   }
 
-  private static class UnpartitionedDataWriter implements 
DataWriter<InternalRow> {
+  private abstract static class DataWriterWithLineage implements 
DataWriter<InternalRow> {
+    private Integer rowIdOrdinal = null;
+    private Integer lastUpdatedOrdinal = null;
+
+    protected InternalRow lineageRow(InternalRow meta) {
+      GenericInternalRow row = new GenericInternalRow(2);
+      row.setNullAt(0);
+      row.setNullAt(1);
+
+      if (meta == null) {
+        return row;
+      }
+
+      // Ordinals are cached
+      if (rowIdOrdinal != null && lastUpdatedOrdinal != null) {
+        setIfNotNull(row, 0, meta, rowIdOrdinal);
+        setIfNotNull(row, 1, meta, lastUpdatedOrdinal);

Review Comment:
   I'll ideally need to figure out a shared abstraction for both the MoR 
DeltaWriter impl and CoW so we can share the logic to derive the metadata 
values instead of duplicating the logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to