aokolnychyi commented on code in PR #7029: URL: https://github.com/apache/iceberg/pull/7029#discussion_r1151071080
########## core/src/main/java/org/apache/iceberg/PartitionsTable.java: ########## @@ -97,6 +97,7 @@ private static StaticDataTask.Row convertPartition(Partition partition) { private static Iterable<Partition> partitions(Table table, StaticTableScan scan) { CloseableIterable<FileScanTask> tasks = planFiles(scan); + Review Comment: Unnecessary change? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + Preconditions.checkArgument( + writeConf.rewrittenFileSetId() != null, + "position_deletes table can only be written by RewriteDeleteFiles"); + + // all files of rewrite group have same and partition and spec id + ScanTaskSetManager scanTaskSetManager = ScanTaskSetManager.get(); Review Comment: What about `taskSetManager` and `tasks` instead of `scanTaskSetManager` and `scanTasks` for shorter vars? Some of the lines below would fit on one line. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + Preconditions.checkArgument( + writeConf.rewrittenFileSetId() != null, + "position_deletes table can only be written by RewriteDeleteFiles"); + + // all files of rewrite group have same and partition and spec id + ScanTaskSetManager scanTaskSetManager = ScanTaskSetManager.get(); + String fileSetId = writeConf.rewrittenFileSetId(); + List<PositionDeletesScanTask> scanTasks = scanTaskSetManager.fetchTasks(table, fileSetId); + Preconditions.checkNotNull(scanTasks, "no scan tasks found for %s", fileSetId); Review Comment: nit: Shall this error message start with a capital letter like others? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + Preconditions.checkArgument( + writeConf.rewrittenFileSetId() != null, + "position_deletes table can only be written by RewriteDeleteFiles"); + + // all files of rewrite group have same and partition and spec id + ScanTaskSetManager scanTaskSetManager = ScanTaskSetManager.get(); + String fileSetId = writeConf.rewrittenFileSetId(); Review Comment: Shall we define this var at the beginning of the method as it is not the first time we refer to it? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseFileRewriteCoordinator.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class BaseFileRewriteCoordinator<F extends ContentFile<F>> { + + private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class); + + private final Map<Pair<String, String>, Set<F>> resultMap = Maps.newConcurrentMap(); + + /** + * Called to persist the output of a rewrite action for a specific group. Since the write is done + * via a Spark Datasource, we have to propagate the result through this side-effect call. + * + * @param table table where the rewrite is occurring + * @param fileSetId the id used to identify the source set of files being rewritten + * @param newFiles the new files which have been written + */ + public void stageRewrite(Table table, String fileSetId, Set<F> newFiles) { + LOG.debug( + "Staging the output for {} - fileset {} with {} files", + table.name(), + fileSetId, + newFiles.size()); + Pair<String, String> id = toId(table, fileSetId); + resultMap.put(id, newFiles); + } + + public Set<F> fetchNewFiles(Table table, String fileSetID) { Review Comment: nit: `fileSetID` -> `fileSetId` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, Review Comment: nit: Redundant second `delete`? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + Preconditions.checkArgument( + writeConf.rewrittenFileSetId() != null, + "position_deletes table can only be written by RewriteDeleteFiles"); + + // all files of rewrite group have same and partition and spec id + ScanTaskSetManager scanTaskSetManager = ScanTaskSetManager.get(); + String fileSetId = writeConf.rewrittenFileSetId(); + List<PositionDeletesScanTask> scanTasks = scanTaskSetManager.fetchTasks(table, fileSetId); + Preconditions.checkNotNull(scanTasks, "no scan tasks found for %s", fileSetId); + + Set<Integer> specIds = + scanTasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet()); + Set<StructLike> partitions = + scanTasks.stream().map(t -> t.file().partition()).collect(Collectors.toSet()); + Preconditions.checkArgument( + specIds.size() == 1, "All scan tasks of %s are expected to have same spec id", fileSetId); + Preconditions.checkArgument( + partitions.size() == 1, "All scan tasks of %s are expected to have the same partition"); + int specId = scanTasks.get(0).spec().specId(); Review Comment: What about having helper methods to simplify `build`? ``` // all files of rewrite group have same and partition and spec id ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); List<PositionDeletesScanTask> tasks = taskSetManager.fetchTasks(table, fileSetId); Preconditions.checkNotNull(tasks, "No tasks found for %s", fileSetId); PartitionSpec spec = spec(fileSetId, tasks); StructLike partition = partition(fileSetId, spec, tasks); return ... ``` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + + private final int specId; + private final StructLike partition; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + * @param specId spec id of position deletes + * @param partition partition value of position deletes + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.deleteFileFormat(); + this.targetFileSize = writeConf.targetDeleteFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + this.specId = specId; + this.partition = partition; + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeltaWriteFactory( Review Comment: I don't think we write a delta here. What about `PositionDeletesWriterFactory` or something? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + + private final int specId; + private final StructLike partition; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + * @param specId spec id of position deletes + * @param partition partition value of position deletes + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.deleteFileFormat(); + this.targetFileSize = writeConf.targetDeleteFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + this.specId = specId; + this.partition = partition; + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeltaWriteFactory( + tableBroadcast, + queryId, + format, + targetFileSize, + writeSchema, + dsSchema, + specId, + partition); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } + + private List<DeleteFile> files(WriterCommitMessage[] messages) { + List<DeleteFile> files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + DeleteTaskCommit taskCommit = (DeleteTaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; + } + } + + /** + * Write factory for position deletes metadata table. Responsible for creating {@link + * DeleteWriter}. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ + static class PositionDeltaWriteFactory implements DataWriterFactory { + private final Broadcast<Table> tableBroadcast; + private final String queryId; + private final FileFormat format; + private final Long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final int specId; + private final StructLike partition; + + PositionDeltaWriteFactory( + Broadcast<Table> tableBroadcast, + String queryId, + FileFormat format, + long targetFileSize, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.tableBroadcast = tableBroadcast; + this.queryId = queryId; + this.format = format; + this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.specId = specId; + this.partition = partition; + } + + @Override + public DataWriter<InternalRow> createWriter(int partitionId, long taskId) { + Table table = tableBroadcast.value(); + + OutputFileFactory deleteFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .suffix("deletes") + .build(); + + Schema positionDeleteRowSchema = + new Schema( + writeSchema + .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + .type() + .asStructType() + .fields()); + StructType deleteSparkType = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + }); + SparkFileWriterFactory writerFactoryWithRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteRowSchema(positionDeleteRowSchema) + .positionDeleteSparkType(deleteSparkType) + .build(); + + StructType deleteSparkTypeWithoutRow = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + }); + SparkFileWriterFactory writerFactoryWithoutRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteSparkType(deleteSparkTypeWithoutRow) + .build(); + + return new DeleteWriter( + table, + writerFactoryWithRow, + writerFactoryWithoutRow, + deleteFileFactory, + targetFileSize, + dsSchema, + specId, + partition); + } + } + + /** + * Writer for position deletes metadata table. + * + * <p>Iceberg specifies delete files schema as having either 'row' as an required field, or omits Review Comment: nit: `an required` -> `a required` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + Preconditions.checkArgument( + writeConf.rewrittenFileSetId() != null, + "position_deletes table can only be written by RewriteDeleteFiles"); + + // all files of rewrite group have same and partition and spec id Review Comment: nit: Redundant `and` before `partition and spec id`? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + + private final int specId; + private final StructLike partition; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + * @param specId spec id of position deletes + * @param partition partition value of position deletes + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.deleteFileFormat(); + this.targetFileSize = writeConf.targetDeleteFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + this.specId = specId; + this.partition = partition; + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeltaWriteFactory( + tableBroadcast, + queryId, + format, + targetFileSize, + writeSchema, + dsSchema, + specId, + partition); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } + + private List<DeleteFile> files(WriterCommitMessage[] messages) { + List<DeleteFile> files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + DeleteTaskCommit taskCommit = (DeleteTaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; + } + } + + /** + * Write factory for position deletes metadata table. Responsible for creating {@link + * DeleteWriter}. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ + static class PositionDeltaWriteFactory implements DataWriterFactory { + private final Broadcast<Table> tableBroadcast; + private final String queryId; + private final FileFormat format; + private final Long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final int specId; + private final StructLike partition; + + PositionDeltaWriteFactory( + Broadcast<Table> tableBroadcast, + String queryId, + FileFormat format, + long targetFileSize, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.tableBroadcast = tableBroadcast; + this.queryId = queryId; + this.format = format; + this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.specId = specId; + this.partition = partition; + } + + @Override + public DataWriter<InternalRow> createWriter(int partitionId, long taskId) { + Table table = tableBroadcast.value(); + + OutputFileFactory deleteFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .suffix("deletes") + .build(); + + Schema positionDeleteRowSchema = Review Comment: What about 3 extra methods to simplify this block? ``` private Schema positionDeleteRowSchema() { return new Schema(...); } private StructType deleteSparkType() { return new StructType(...); } private StructType deleteSparkTypeWithoutRow() { return new StructType(...); } ``` We could directly call them when constructing `SparkFileWriterFactory`. ``` SparkFileWriterFactory writerFactoryWithRow = SparkFileWriterFactory.builderFor(table) .deleteFileFormat(format) .positionDeleteRowSchema(positionDeleteRowSchema()) .positionDeleteSparkType(deleteSparkType()) .build(); ``` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + + private final int specId; + private final StructLike partition; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + * @param specId spec id of position deletes + * @param partition partition value of position deletes + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.deleteFileFormat(); + this.targetFileSize = writeConf.targetDeleteFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + this.specId = specId; + this.partition = partition; + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeltaWriteFactory( + tableBroadcast, + queryId, + format, + targetFileSize, + writeSchema, + dsSchema, + specId, + partition); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } + + private List<DeleteFile> files(WriterCommitMessage[] messages) { + List<DeleteFile> files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + DeleteTaskCommit taskCommit = (DeleteTaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; + } + } + + /** + * Write factory for position deletes metadata table. Responsible for creating {@link + * DeleteWriter}. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ + static class PositionDeltaWriteFactory implements DataWriterFactory { + private final Broadcast<Table> tableBroadcast; + private final String queryId; + private final FileFormat format; + private final Long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final int specId; + private final StructLike partition; + + PositionDeltaWriteFactory( + Broadcast<Table> tableBroadcast, + String queryId, + FileFormat format, + long targetFileSize, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.tableBroadcast = tableBroadcast; + this.queryId = queryId; + this.format = format; + this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.specId = specId; + this.partition = partition; + } + + @Override + public DataWriter<InternalRow> createWriter(int partitionId, long taskId) { + Table table = tableBroadcast.value(); + + OutputFileFactory deleteFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .suffix("deletes") + .build(); + + Schema positionDeleteRowSchema = + new Schema( + writeSchema + .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + .type() + .asStructType() + .fields()); + StructType deleteSparkType = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + }); + SparkFileWriterFactory writerFactoryWithRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteRowSchema(positionDeleteRowSchema) + .positionDeleteSparkType(deleteSparkType) + .build(); + + StructType deleteSparkTypeWithoutRow = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + }); + SparkFileWriterFactory writerFactoryWithoutRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteSparkType(deleteSparkTypeWithoutRow) + .build(); + + return new DeleteWriter( + table, + writerFactoryWithRow, + writerFactoryWithoutRow, + deleteFileFactory, + targetFileSize, + dsSchema, + specId, + partition); + } + } + + /** + * Writer for position deletes metadata table. + * + * <p>Iceberg specifies delete files schema as having either 'row' as an required field, or omits + * 'row' altogether. This is to ensure accuracy of delete file statistics on 'row' column. Hence, + * this writer, if receiving source position deletes with null and non-null rows, redirects rows + * with null 'row' to one file writer, and non-null 'row' to another file writer. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition. + */ + private static class DeleteWriter implements DataWriter<InternalRow> { + private final SparkFileWriterFactory writerFactoryWithRow; + private final SparkFileWriterFactory writerFactoryWithoutRow; + private final OutputFileFactory deleteFileFactory; + private final long targetFileSize; + private final PositionDelete<InternalRow> positionDelete; + private final FileIO io; + private final PartitionSpec spec; + private final int fileOrdinal; + private final int positionOrdinal; + private final int rowOrdinal; + private final int rowSize; + private final StructLike partition; + + private ClusteredPositionDeleteWriter<InternalRow> writerWithRow; + private ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow; + private boolean closed = false; + + /** + * Constructs a DeleteWriter + * + * @param table position deletes metadata table + * @param writerFactoryWithRow writer factory for deletes with non-null 'row' + * @param writerFactoryWithoutRow writer factory for deletes with null 'row' + * @param deleteFileFactory delete file factory + * @param targetFileSize target file size + * @param dsSchema schema of incoming dataset of position deletes + * @param specId partition spec id of incoming position deletes. All incoming partition deletes + * are required to have the same spec id. + * @param partition partition value of incoming position delete. All incoming partition deletes + * are required to have the same partition. + */ + DeleteWriter( + Table table, + SparkFileWriterFactory writerFactoryWithRow, + SparkFileWriterFactory writerFactoryWithoutRow, + OutputFileFactory deleteFileFactory, + long targetFileSize, + StructType dsSchema, + int specId, + StructLike partition) { + this.deleteFileFactory = deleteFileFactory; + this.targetFileSize = targetFileSize; + this.writerFactoryWithRow = writerFactoryWithRow; + this.writerFactoryWithoutRow = writerFactoryWithoutRow; + this.positionDelete = PositionDelete.create(); + this.io = table.io(); + this.spec = table.specs().get(specId); + this.partition = partition; + + this.fileOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_PATH.name()); + this.positionOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_POS.name()); + + this.rowOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME); + DataType type = dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME).dataType(); + Preconditions.checkArgument( + type instanceof StructType, "Expected row as struct type but was %s", type); + this.rowSize = ((StructType) type).size(); + } + + @Override + public void write(InternalRow record) throws IOException { + String file = record.getString(fileOrdinal); + long position = record.getLong(positionOrdinal); + InternalRow row = record.getStruct(rowOrdinal, rowSize); + if (row != null) { + positionDelete.set(file, position, row); + lazyWriterWithRow().write(positionDelete, spec, partition); + } else { + positionDelete.set(file, position, null); + lazyWriterWithoutRow().write(positionDelete, spec, partition); + } + } + + @Override + public WriterCommitMessage commit() throws IOException { + close(); + + List<DeleteFile> allDeleteFiles = Lists.newArrayList(); + if (writerWithRow != null) { + allDeleteFiles.addAll(writerWithRow.result().deleteFiles()); + } + if (writerWithoutRow != null) { + allDeleteFiles.addAll(writerWithoutRow.result().deleteFiles()); + } + return new DeleteTaskCommit(allDeleteFiles); + } + + @Override + public void abort() throws IOException { + close(); + + List<DeleteFile> allDeleteFiles = Lists.newArrayList(); + if (writerWithRow != null) { + allDeleteFiles.addAll(writerWithRow.result().deleteFiles()); + } + if (writerWithoutRow != null) { + allDeleteFiles.addAll(writerWithoutRow.result().deleteFiles()); + } + SparkCleanupUtil.deleteTaskFiles(io, Lists.newArrayList(allDeleteFiles)); + } + + @Override + public void close() throws IOException { + if (!closed) { + if (writerWithRow != null) { + writerWithRow.close(); + } + if (writerWithoutRow != null) { + writerWithoutRow.close(); + } + this.closed = true; + } + } + + private ClusteredPositionDeleteWriter<InternalRow> lazyWriterWithRow() { + if (writerWithRow == null) { + writerWithRow = Review Comment: nit: We usually use `this.varName` whenever setting (only setting, not getting) a field. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + Preconditions.checkArgument( + writeConf.rewrittenFileSetId() != null, + "position_deletes table can only be written by RewriteDeleteFiles"); + + // all files of rewrite group have same and partition and spec id + ScanTaskSetManager scanTaskSetManager = ScanTaskSetManager.get(); + String fileSetId = writeConf.rewrittenFileSetId(); + List<PositionDeletesScanTask> scanTasks = scanTaskSetManager.fetchTasks(table, fileSetId); + Preconditions.checkNotNull(scanTasks, "no scan tasks found for %s", fileSetId); + + Set<Integer> specIds = + scanTasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet()); + Set<StructLike> partitions = + scanTasks.stream().map(t -> t.file().partition()).collect(Collectors.toSet()); + Preconditions.checkArgument( + specIds.size() == 1, "All scan tasks of %s are expected to have same spec id", fileSetId); Review Comment: Shall we include all spec IDs we got in the error message? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + Preconditions.checkArgument( + writeConf.rewrittenFileSetId() != null, + "position_deletes table can only be written by RewriteDeleteFiles"); + + // all files of rewrite group have same and partition and spec id + ScanTaskSetManager scanTaskSetManager = ScanTaskSetManager.get(); + String fileSetId = writeConf.rewrittenFileSetId(); + List<PositionDeletesScanTask> scanTasks = scanTaskSetManager.fetchTasks(table, fileSetId); + Preconditions.checkNotNull(scanTasks, "no scan tasks found for %s", fileSetId); + + Set<Integer> specIds = + scanTasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet()); + Set<StructLike> partitions = Review Comment: Shall we use `StructLikeSet` for partitions? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + Preconditions.checkArgument( + writeConf.rewrittenFileSetId() != null, + "position_deletes table can only be written by RewriteDeleteFiles"); + + // all files of rewrite group have same and partition and spec id + ScanTaskSetManager scanTaskSetManager = ScanTaskSetManager.get(); + String fileSetId = writeConf.rewrittenFileSetId(); + List<PositionDeletesScanTask> scanTasks = scanTaskSetManager.fetchTasks(table, fileSetId); + Preconditions.checkNotNull(scanTasks, "no scan tasks found for %s", fileSetId); + + Set<Integer> specIds = + scanTasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet()); + Set<StructLike> partitions = + scanTasks.stream().map(t -> t.file().partition()).collect(Collectors.toSet()); + Preconditions.checkArgument( + specIds.size() == 1, "All scan tasks of %s are expected to have same spec id", fileSetId); + Preconditions.checkArgument( + partitions.size() == 1, "All scan tasks of %s are expected to have the same partition"); Review Comment: We define `%s` but don't include a value for `fileSetId`. Also, what about including the set of partitions we got? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite} + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewrite implements Write { + private static final Logger LOG = LoggerFactory.getLogger(SparkPositionDeletesRewrite.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + private final int specId; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.dataFileFormat(); + this.targetFileSize = writeConf.targetDataFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + + // all files of rewrite group have same spec id + ScanTaskSetManager scanTaskSetManager = ScanTaskSetManager.get(); + List<PositionDeletesScanTask> scanTasks = scanTaskSetManager.fetchTasks(table, fileSetId); + this.specId = scanTasks.get(0).spec().specId(); + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeltaWriteFactory( + tableBroadcast, queryId, format, targetFileSize, writeSchema, dsSchema, specId); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } + + private List<DeleteFile> files(WriterCommitMessage[] messages) { + List<DeleteFile> files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + DeleteTaskCommit taskCommit = (DeleteTaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; + } + } + + /** + * Write factory for position deletes metadata table. Responsible for creating {@link + * DeleteWriter}. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ + static class PositionDeltaWriteFactory implements DataWriterFactory { + private final Broadcast<Table> tableBroadcast; + private final String queryId; + private final FileFormat format; + private final Long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final int specId; + + PositionDeltaWriteFactory( + Broadcast<Table> tableBroadcast, + String queryId, + FileFormat format, + long targetFileSize, + Schema writeSchema, + StructType dsSchema, + int specId) { + this.tableBroadcast = tableBroadcast; + this.queryId = queryId; + this.format = format; + this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.specId = specId; + } + + @Override + public DataWriter<InternalRow> createWriter(int partitionId, long taskId) { + Table table = tableBroadcast.value(); + + OutputFileFactory deleteFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .suffix("deletes") + .build(); + + Schema positionDeleteRowSchema = + new Schema( + writeSchema + .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + .type() + .asStructType() + .fields()); + StructType deleteFileType = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + }); + + SparkFileWriterFactory writerFactoryWithRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) Review Comment: That place is different as it writes a delta (delete + insert). I think you should be able to drop `dataXXX` methods and it should work fine. The writer factory will use the default format for data but we are not using that anyway. We just write deletes. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java: ########## @@ -18,79 +18,23 @@ */ package org.apache.iceberg.spark; -import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import org.apache.iceberg.DataFile; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class FileRewriteCoordinator { +public class FileRewriteCoordinator extends BaseFileRewriteCoordinator<DataFile> { - private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class); private static final FileRewriteCoordinator INSTANCE = new FileRewriteCoordinator(); - private final Map<Pair<String, String>, Set<DataFile>> resultMap = Maps.newConcurrentMap(); - private FileRewriteCoordinator() {} public static FileRewriteCoordinator get() { return INSTANCE; } - /** - * Called to persist the output of a rewrite action for a specific group. Since the write is done - * via a Spark Datasource, we have to propagate the result through this side-effect call. - * - * @param table table where the rewrite is occurring - * @param fileSetID the id used to identify the source set of files being rewritten - * @param newDataFiles the new files which have been written - */ - public void stageRewrite(Table table, String fileSetID, Set<DataFile> newDataFiles) { - LOG.debug( - "Staging the output for {} - fileset {} with {} files", - table.name(), - fileSetID, - newDataFiles.size()); - Pair<String, String> id = toID(table, fileSetID); - resultMap.put(id, newDataFiles); - } - - public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) { - Pair<String, String> id = toID(table, fileSetID); - Set<DataFile> result = resultMap.get(id); - ValidationException.check( - result != null, "No results for rewrite of file set %s in table %s", fileSetID, table); - - return result; - } - - public void clearRewrite(Table table, String fileSetID) { - LOG.debug("Removing entry from RewriteCoordinator for {} - id {}", table.name(), fileSetID); - Pair<String, String> id = toID(table, fileSetID); - resultMap.remove(id); - } - - public Set<String> fetchSetIDs(Table table) { Review Comment: We should keep and deprecate this one too. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + + private final int specId; + private final StructLike partition; + + /** + * Constructs a SparkPositionDeletesWrite. Review Comment: I think there is a typo in the name. Shall we also use `@link` like you do in other places? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + + private final int specId; + private final StructLike partition; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session Review Comment: nit: `spark` -> `Spark` in all comments here? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { Review Comment: You are probably right. It should be OK as we don't produce a partition tuple. What about validation of timestamps? Will we have issues if the persisted row has a timestamp without a timezone? Our `SparkWriteBuilder` has the following call. ``` Preconditions.checkArgument( handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()), SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); ``` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + Review Comment: Is this empty line intentional? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + + private final int specId; + private final StructLike partition; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + * @param specId spec id of position deletes + * @param partition partition value of position deletes + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.deleteFileFormat(); + this.targetFileSize = writeConf.targetDeleteFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + this.specId = specId; + this.partition = partition; + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeltaWriteFactory( + tableBroadcast, + queryId, + format, + targetFileSize, + writeSchema, + dsSchema, + specId, + partition); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } + + private List<DeleteFile> files(WriterCommitMessage[] messages) { + List<DeleteFile> files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + DeleteTaskCommit taskCommit = (DeleteTaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; + } + } + + /** + * Write factory for position deletes metadata table. Responsible for creating {@link + * DeleteWriter}. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ + static class PositionDeltaWriteFactory implements DataWriterFactory { + private final Broadcast<Table> tableBroadcast; + private final String queryId; + private final FileFormat format; + private final Long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final int specId; + private final StructLike partition; + + PositionDeltaWriteFactory( + Broadcast<Table> tableBroadcast, + String queryId, + FileFormat format, + long targetFileSize, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.tableBroadcast = tableBroadcast; + this.queryId = queryId; + this.format = format; + this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.specId = specId; + this.partition = partition; + } + + @Override + public DataWriter<InternalRow> createWriter(int partitionId, long taskId) { + Table table = tableBroadcast.value(); + + OutputFileFactory deleteFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .suffix("deletes") + .build(); + + Schema positionDeleteRowSchema = + new Schema( + writeSchema + .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + .type() + .asStructType() + .fields()); + StructType deleteSparkType = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + }); + SparkFileWriterFactory writerFactoryWithRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteRowSchema(positionDeleteRowSchema) + .positionDeleteSparkType(deleteSparkType) + .build(); + + StructType deleteSparkTypeWithoutRow = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + }); + SparkFileWriterFactory writerFactoryWithoutRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteSparkType(deleteSparkTypeWithoutRow) + .build(); + + return new DeleteWriter( + table, + writerFactoryWithRow, + writerFactoryWithoutRow, + deleteFileFactory, + targetFileSize, + dsSchema, + specId, + partition); + } + } + + /** + * Writer for position deletes metadata table. + * + * <p>Iceberg specifies delete files schema as having either 'row' as an required field, or omits + * 'row' altogether. This is to ensure accuracy of delete file statistics on 'row' column. Hence, + * this writer, if receiving source position deletes with null and non-null rows, redirects rows + * with null 'row' to one file writer, and non-null 'row' to another file writer. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition. + */ + private static class DeleteWriter implements DataWriter<InternalRow> { + private final SparkFileWriterFactory writerFactoryWithRow; + private final SparkFileWriterFactory writerFactoryWithoutRow; + private final OutputFileFactory deleteFileFactory; + private final long targetFileSize; + private final PositionDelete<InternalRow> positionDelete; + private final FileIO io; + private final PartitionSpec spec; + private final int fileOrdinal; + private final int positionOrdinal; + private final int rowOrdinal; + private final int rowSize; + private final StructLike partition; + + private ClusteredPositionDeleteWriter<InternalRow> writerWithRow; + private ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow; + private boolean closed = false; + + /** + * Constructs a DeleteWriter Review Comment: nit: `@link` and dot at the end? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + + private final int specId; + private final StructLike partition; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + * @param specId spec id of position deletes + * @param partition partition value of position deletes + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.deleteFileFormat(); + this.targetFileSize = writeConf.targetDeleteFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + this.specId = specId; + this.partition = partition; + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeltaWriteFactory( + tableBroadcast, + queryId, + format, + targetFileSize, + writeSchema, + dsSchema, + specId, + partition); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } + + private List<DeleteFile> files(WriterCommitMessage[] messages) { + List<DeleteFile> files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + DeleteTaskCommit taskCommit = (DeleteTaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; + } + } + + /** + * Write factory for position deletes metadata table. Responsible for creating {@link + * DeleteWriter}. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ + static class PositionDeltaWriteFactory implements DataWriterFactory { + private final Broadcast<Table> tableBroadcast; + private final String queryId; + private final FileFormat format; + private final Long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final int specId; + private final StructLike partition; + + PositionDeltaWriteFactory( + Broadcast<Table> tableBroadcast, + String queryId, + FileFormat format, + long targetFileSize, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.tableBroadcast = tableBroadcast; + this.queryId = queryId; + this.format = format; + this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.specId = specId; + this.partition = partition; + } + + @Override + public DataWriter<InternalRow> createWriter(int partitionId, long taskId) { + Table table = tableBroadcast.value(); + + OutputFileFactory deleteFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .suffix("deletes") + .build(); + + Schema positionDeleteRowSchema = + new Schema( + writeSchema + .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + .type() + .asStructType() + .fields()); + StructType deleteSparkType = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + }); + SparkFileWriterFactory writerFactoryWithRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteRowSchema(positionDeleteRowSchema) + .positionDeleteSparkType(deleteSparkType) + .build(); + + StructType deleteSparkTypeWithoutRow = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + }); + SparkFileWriterFactory writerFactoryWithoutRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteSparkType(deleteSparkTypeWithoutRow) + .build(); + + return new DeleteWriter( + table, + writerFactoryWithRow, + writerFactoryWithoutRow, + deleteFileFactory, + targetFileSize, + dsSchema, + specId, + partition); + } + } + + /** + * Writer for position deletes metadata table. + * + * <p>Iceberg specifies delete files schema as having either 'row' as an required field, or omits + * 'row' altogether. This is to ensure accuracy of delete file statistics on 'row' column. Hence, + * this writer, if receiving source position deletes with null and non-null rows, redirects rows + * with null 'row' to one file writer, and non-null 'row' to another file writer. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition. + */ + private static class DeleteWriter implements DataWriter<InternalRow> { + private final SparkFileWriterFactory writerFactoryWithRow; + private final SparkFileWriterFactory writerFactoryWithoutRow; + private final OutputFileFactory deleteFileFactory; + private final long targetFileSize; + private final PositionDelete<InternalRow> positionDelete; + private final FileIO io; + private final PartitionSpec spec; + private final int fileOrdinal; + private final int positionOrdinal; + private final int rowOrdinal; + private final int rowSize; + private final StructLike partition; + + private ClusteredPositionDeleteWriter<InternalRow> writerWithRow; + private ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow; + private boolean closed = false; + + /** + * Constructs a DeleteWriter + * + * @param table position deletes metadata table + * @param writerFactoryWithRow writer factory for deletes with non-null 'row' + * @param writerFactoryWithoutRow writer factory for deletes with null 'row' + * @param deleteFileFactory delete file factory + * @param targetFileSize target file size + * @param dsSchema schema of incoming dataset of position deletes + * @param specId partition spec id of incoming position deletes. All incoming partition deletes + * are required to have the same spec id. + * @param partition partition value of incoming position delete. All incoming partition deletes + * are required to have the same partition. + */ + DeleteWriter( + Table table, + SparkFileWriterFactory writerFactoryWithRow, + SparkFileWriterFactory writerFactoryWithoutRow, + OutputFileFactory deleteFileFactory, + long targetFileSize, + StructType dsSchema, + int specId, + StructLike partition) { + this.deleteFileFactory = deleteFileFactory; + this.targetFileSize = targetFileSize; + this.writerFactoryWithRow = writerFactoryWithRow; + this.writerFactoryWithoutRow = writerFactoryWithoutRow; + this.positionDelete = PositionDelete.create(); + this.io = table.io(); + this.spec = table.specs().get(specId); + this.partition = partition; + + this.fileOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_PATH.name()); + this.positionOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_POS.name()); + + this.rowOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME); + DataType type = dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME).dataType(); + Preconditions.checkArgument( + type instanceof StructType, "Expected row as struct type but was %s", type); + this.rowSize = ((StructType) type).size(); + } + + @Override + public void write(InternalRow record) throws IOException { + String file = record.getString(fileOrdinal); + long position = record.getLong(positionOrdinal); + InternalRow row = record.getStruct(rowOrdinal, rowSize); + if (row != null) { + positionDelete.set(file, position, row); + lazyWriterWithRow().write(positionDelete, spec, partition); + } else { + positionDelete.set(file, position, null); + lazyWriterWithoutRow().write(positionDelete, spec, partition); + } + } + + @Override + public WriterCommitMessage commit() throws IOException { + close(); + + List<DeleteFile> allDeleteFiles = Lists.newArrayList(); Review Comment: Looks like we need this in two places, what about a common method `allDeleteFiles()`? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete delete files. Hence, + * it assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + + private final int specId; + private final StructLike partition; + + /** + * Constructs a SparkPositionDeletesWrite. + * + * @param spark spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf spark write config + * @param writeInfo spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + * @param specId spec id of position deletes + * @param partition partition value of position deletes + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.deleteFileFormat(); + this.targetFileSize = writeConf.targetDeleteFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + this.specId = specId; + this.partition = partition; + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeltaWriteFactory( + tableBroadcast, + queryId, + format, + targetFileSize, + writeSchema, + dsSchema, + specId, + partition); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } + + private List<DeleteFile> files(WriterCommitMessage[] messages) { + List<DeleteFile> files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + DeleteTaskCommit taskCommit = (DeleteTaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; + } + } + + /** + * Write factory for position deletes metadata table. Responsible for creating {@link + * DeleteWriter}. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ + static class PositionDeltaWriteFactory implements DataWriterFactory { + private final Broadcast<Table> tableBroadcast; + private final String queryId; + private final FileFormat format; + private final Long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final int specId; + private final StructLike partition; + + PositionDeltaWriteFactory( + Broadcast<Table> tableBroadcast, + String queryId, + FileFormat format, + long targetFileSize, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.tableBroadcast = tableBroadcast; + this.queryId = queryId; + this.format = format; + this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.specId = specId; + this.partition = partition; + } + + @Override + public DataWriter<InternalRow> createWriter(int partitionId, long taskId) { + Table table = tableBroadcast.value(); + + OutputFileFactory deleteFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .suffix("deletes") + .build(); + + Schema positionDeleteRowSchema = + new Schema( + writeSchema + .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + .type() + .asStructType() + .fields()); + StructType deleteSparkType = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME) + }); + SparkFileWriterFactory writerFactoryWithRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteRowSchema(positionDeleteRowSchema) + .positionDeleteSparkType(deleteSparkType) + .build(); + + StructType deleteSparkTypeWithoutRow = + new StructType( + new StructField[] { + dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()), + dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()), + }); + SparkFileWriterFactory writerFactoryWithoutRow = + SparkFileWriterFactory.builderFor(table) + .dataFileFormat(format) + .dataSchema(writeSchema) + .dataSparkType(dsSchema) + .deleteFileFormat(format) + .positionDeleteSparkType(deleteSparkTypeWithoutRow) + .build(); + + return new DeleteWriter( + table, + writerFactoryWithRow, + writerFactoryWithoutRow, + deleteFileFactory, + targetFileSize, + dsSchema, + specId, + partition); + } + } + + /** + * Writer for position deletes metadata table. + * + * <p>Iceberg specifies delete files schema as having either 'row' as an required field, or omits + * 'row' altogether. This is to ensure accuracy of delete file statistics on 'row' column. Hence, + * this writer, if receiving source position deletes with null and non-null rows, redirects rows + * with null 'row' to one file writer, and non-null 'row' to another file writer. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition. + */ + private static class DeleteWriter implements DataWriter<InternalRow> { + private final SparkFileWriterFactory writerFactoryWithRow; + private final SparkFileWriterFactory writerFactoryWithoutRow; + private final OutputFileFactory deleteFileFactory; + private final long targetFileSize; + private final PositionDelete<InternalRow> positionDelete; + private final FileIO io; + private final PartitionSpec spec; + private final int fileOrdinal; + private final int positionOrdinal; + private final int rowOrdinal; + private final int rowSize; + private final StructLike partition; + + private ClusteredPositionDeleteWriter<InternalRow> writerWithRow; + private ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow; + private boolean closed = false; + + /** + * Constructs a DeleteWriter + * + * @param table position deletes metadata table + * @param writerFactoryWithRow writer factory for deletes with non-null 'row' + * @param writerFactoryWithoutRow writer factory for deletes with null 'row' + * @param deleteFileFactory delete file factory + * @param targetFileSize target file size + * @param dsSchema schema of incoming dataset of position deletes + * @param specId partition spec id of incoming position deletes. All incoming partition deletes + * are required to have the same spec id. + * @param partition partition value of incoming position delete. All incoming partition deletes + * are required to have the same partition. + */ + DeleteWriter( + Table table, + SparkFileWriterFactory writerFactoryWithRow, + SparkFileWriterFactory writerFactoryWithoutRow, + OutputFileFactory deleteFileFactory, + long targetFileSize, + StructType dsSchema, + int specId, + StructLike partition) { + this.deleteFileFactory = deleteFileFactory; + this.targetFileSize = targetFileSize; + this.writerFactoryWithRow = writerFactoryWithRow; + this.writerFactoryWithoutRow = writerFactoryWithoutRow; + this.positionDelete = PositionDelete.create(); + this.io = table.io(); + this.spec = table.specs().get(specId); + this.partition = partition; + + this.fileOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_PATH.name()); + this.positionOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_POS.name()); + + this.rowOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME); + DataType type = dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME).dataType(); + Preconditions.checkArgument( + type instanceof StructType, "Expected row as struct type but was %s", type); + this.rowSize = ((StructType) type).size(); + } + + @Override + public void write(InternalRow record) throws IOException { + String file = record.getString(fileOrdinal); + long position = record.getLong(positionOrdinal); + InternalRow row = record.getStruct(rowOrdinal, rowSize); + if (row != null) { + positionDelete.set(file, position, row); + lazyWriterWithRow().write(positionDelete, spec, partition); + } else { + positionDelete.set(file, position, null); + lazyWriterWithoutRow().write(positionDelete, spec, partition); + } + } + + @Override + public WriterCommitMessage commit() throws IOException { + close(); + + List<DeleteFile> allDeleteFiles = Lists.newArrayList(); + if (writerWithRow != null) { + allDeleteFiles.addAll(writerWithRow.result().deleteFiles()); + } + if (writerWithoutRow != null) { + allDeleteFiles.addAll(writerWithoutRow.result().deleteFiles()); + } + return new DeleteTaskCommit(allDeleteFiles); + } + + @Override + public void abort() throws IOException { + close(); + + List<DeleteFile> allDeleteFiles = Lists.newArrayList(); + if (writerWithRow != null) { + allDeleteFiles.addAll(writerWithRow.result().deleteFiles()); + } + if (writerWithoutRow != null) { + allDeleteFiles.addAll(writerWithoutRow.result().deleteFiles()); + } + SparkCleanupUtil.deleteTaskFiles(io, Lists.newArrayList(allDeleteFiles)); + } + + @Override + public void close() throws IOException { + if (!closed) { + if (writerWithRow != null) { + writerWithRow.close(); + } + if (writerWithoutRow != null) { + writerWithoutRow.close(); + } + this.closed = true; + } + } + + private ClusteredPositionDeleteWriter<InternalRow> lazyWriterWithRow() { + if (writerWithRow == null) { + writerWithRow = + new ClusteredPositionDeleteWriter<>( + writerFactoryWithRow, deleteFileFactory, io, targetFileSize); + } + return writerWithRow; + } + + private ClusteredPositionDeleteWriter<InternalRow> lazyWriterWithoutRow() { + if (writerWithoutRow == null) { + writerWithoutRow = Review Comment: nit: `this.writerWithoutRow = ...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org