aokolnychyi commented on code in PR #7029:
URL: https://github.com/apache/iceberg/pull/7029#discussion_r1137738090


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -767,4 +824,270 @@ public void close() throws IOException {
       delegate.close();
     }
   }
+
+  class PositionDeleteBatchWrite implements BatchWrite {
+
+    private String fileSetID;
+
+    private PositionDeleteBatchWrite(String fileSetID) {
+      this.fileSetID = fileSetID;
+    }
+
+    @Override
+    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+      // broadcast the table metadata as the writer factory will be sent to 
executors
+      Broadcast<Table> tableBroadcast =
+          sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+      return new PositionDeltaWriteFactory(
+          tableBroadcast, queryId, format, targetFileSize, writeSchema, 
dsSchema);
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+      PositionDeletesRewriteCoordinator coordinator = 
PositionDeletesRewriteCoordinator.get();
+      coordinator.stageRewrite(table, fileSetID, 
ImmutableSet.copyOf(files(messages)));
+    }
+
+    @Override
+    public void abort(WriterCommitMessage[] messages) {
+      if (cleanupOnAbort) {
+        SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+      } else {
+        LOG.warn("Skipping cleanup of written files");
+      }
+    }
+
+    private List<DeleteFile> files(WriterCommitMessage[] messages) {
+      List<DeleteFile> files = Lists.newArrayList();
+
+      for (WriterCommitMessage message : messages) {
+        if (message != null) {
+          DeleteTaskCommit taskCommit = (DeleteTaskCommit) message;
+          files.addAll(Arrays.asList(taskCommit.files()));
+        }
+      }
+
+      return files;
+    }
+  }
+
+  static class PositionDeltaWriteFactory implements DataWriterFactory {
+    private final Broadcast<Table> tableBroadcast;
+    private final String queryId;
+    private final FileFormat format;
+    private final Long targetFileSize;
+    private final Schema writeSchema;
+    private final StructType dsSchema;
+
+    PositionDeltaWriteFactory(
+        Broadcast<Table> tableBroadcast,
+        String queryId,
+        FileFormat format,
+        long targetFileSize,
+        Schema writeSchema,
+        StructType dsSchema) {
+      this.tableBroadcast = tableBroadcast;
+      this.queryId = queryId;
+      this.format = format;
+      this.targetFileSize = targetFileSize;
+      this.writeSchema = writeSchema;
+      this.dsSchema = dsSchema;
+    }
+
+    @Override
+    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+      Table table = tableBroadcast.value();
+
+      OutputFileFactory deleteFileFactory =
+          OutputFileFactory.builderFor(table, partitionId, taskId)
+              .format(format)
+              .operationId(queryId)
+              .suffix("deletes")
+              .build();
+
+      Schema positionDeleteRowSchema =
+          new Schema(
+              writeSchema
+                  .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+                  .type()
+                  .asStructType()
+                  .fields());
+      StructType deleteFileType =
+          new StructType(
+              new StructField[] {
+                dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()),
+                dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()),
+                dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+              });
+
+      SparkFileWriterFactory writerFactoryWithRow =

Review Comment:
   Let me read up on this a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to