aokolnychyi commented on code in PR #7029: URL: https://github.com/apache/iceberg/pull/7029#discussion_r1154048543
########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + String fileSetId = writeConf.rewrittenFileSetId(); + + Preconditions.checkArgument( + fileSetId != null, "position_deletes table can only be written by RewriteDeleteFiles"); + Preconditions.checkArgument( + writeConf.handleTimestampWithoutZone() + || !SparkUtil.hasTimestampWithoutZone(table.schema()), + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); + + // all files of rewrite group have same partition and spec id + ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); + List<PositionDeletesScanTask> tasks = taskSetManager.fetchTasks(table, fileSetId); + Preconditions.checkNotNull(tasks, "No scan tasks found for %s", fileSetId); + Preconditions.checkArgument(tasks.size() > 0, "No scan tasks found for %s", fileSetId); + + int specId = specId(fileSetId, tasks); + StructLike partition = partition(fileSetId, tasks); + + return new SparkPositionDeletesRewrite( + spark, table, writeConf, writeInfo, writeSchema, dsSchema, specId, partition); + } + + private int specId(String fileSetId, List<PositionDeletesScanTask> tasks) { + Set<Integer> specIds = tasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet()); + Preconditions.checkArgument( + specIds.size() == 1, + "All scan tasks of %s are expected to have same spec id, but got %s", + fileSetId, + Joiner.on(",").join(specIds)); + return tasks.get(0).spec().specId(); + } + + private StructLike partition(String fileSetId, List<PositionDeletesScanTask> tasks) { + StructLikeSet partitions = StructLikeSet.create(tasks.get(0).spec().partitionType()); + partitions.addAll(tasks.stream().map(ContentScanTask::partition).collect(Collectors.toList())); + Preconditions.checkArgument( + partitions.size() == 1, + "All scan tasks of %s are expected to have the same partition", Review Comment: Did we miss `, but got %s` at the end to include partitions? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseFileRewriteCoordinator.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class BaseFileRewriteCoordinator<F extends ContentFile<F>> { + + private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class); Review Comment: I think we are using a wrong class for logging. It should be `BaseFileRewriteCoordinator`. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + String fileSetId = writeConf.rewrittenFileSetId(); + + Preconditions.checkArgument( + fileSetId != null, "position_deletes table can only be written by RewriteDeleteFiles"); + Preconditions.checkArgument( + writeConf.handleTimestampWithoutZone() Review Comment: I think this part would be easier to read if we define `fileSetId` and `handleTimestampWithoutZone` instance variables, similar to what we have in `SparkWriteBuilder`. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + String fileSetId = writeConf.rewrittenFileSetId(); + + Preconditions.checkArgument( + fileSetId != null, "position_deletes table can only be written by RewriteDeleteFiles"); + Preconditions.checkArgument( + writeConf.handleTimestampWithoutZone() + || !SparkUtil.hasTimestampWithoutZone(table.schema()), + SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); + + // all files of rewrite group have same partition and spec id + ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); + List<PositionDeletesScanTask> tasks = taskSetManager.fetchTasks(table, fileSetId); + Preconditions.checkNotNull(tasks, "No scan tasks found for %s", fileSetId); + Preconditions.checkArgument(tasks.size() > 0, "No scan tasks found for %s", fileSetId); + + int specId = specId(fileSetId, tasks); + StructLike partition = partition(fileSetId, tasks); + + return new SparkPositionDeletesRewrite( + spark, table, writeConf, writeInfo, writeSchema, dsSchema, specId, partition); + } + + private int specId(String fileSetId, List<PositionDeletesScanTask> tasks) { + Set<Integer> specIds = tasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet()); + Preconditions.checkArgument( + specIds.size() == 1, + "All scan tasks of %s are expected to have same spec id, but got %s", + fileSetId, + Joiner.on(",").join(specIds)); + return tasks.get(0).spec().specId(); + } + + private StructLike partition(String fileSetId, List<PositionDeletesScanTask> tasks) { + StructLikeSet partitions = StructLikeSet.create(tasks.get(0).spec().partitionType()); + partitions.addAll(tasks.stream().map(ContentScanTask::partition).collect(Collectors.toList())); Review Comment: nit: I think you can use `forEach` instead of a temp list. ``` tasks.stream().map(ContentScanTask::partition).forEach(partitions::add); ``` In any case, I like what you did here. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java: ########## @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.ClusteredPositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * {@link Write} class for rewriting position delete files from Spark. Responsible for creating + * {@link PositionDeleteBatchWrite}. + * + * <p>This class is meant to be used for an action to rewrite position delete files. Hence, it + * assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all + * have the same partition spec id and partition values. + */ +public class SparkPositionDeletesRewrite implements Write { + + private final JavaSparkContext sparkContext; + private final Table table; + private final String queryId; + private final FileFormat format; + private final long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final String fileSetId; + private final int specId; + private final StructLike partition; + + /** + * Constructs a {@link SparkPositionDeletesRewrite}. + * + * @param spark Spark session + * @param table instance of {@link PositionDeletesTable} + * @param writeConf Spark write config + * @param writeInfo Spark write info + * @param writeSchema Iceberg output schema + * @param dsSchema schema of original incoming position deletes dataset + * @param specId spec id of position deletes + * @param partition partition value of position deletes + */ + SparkPositionDeletesRewrite( + SparkSession spark, + Table table, + SparkWriteConf writeConf, + LogicalWriteInfo writeInfo, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + this.table = table; + this.queryId = writeInfo.queryId(); + this.format = writeConf.deleteFileFormat(); + this.targetFileSize = writeConf.targetDeleteFileSize(); + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.fileSetId = writeConf.rewrittenFileSetId(); + this.specId = specId; + this.partition = partition; + } + + @Override + public BatchWrite toBatch() { + return new PositionDeleteBatchWrite(); + } + + /** {@link BatchWrite} class for rewriting position deletes files from Spark */ + class PositionDeleteBatchWrite implements BatchWrite { + + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + // broadcast the table metadata as the writer factory will be sent to executors + Broadcast<Table> tableBroadcast = + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); + return new PositionDeletesWriterFactory( + tableBroadcast, + queryId, + format, + targetFileSize, + writeSchema, + dsSchema, + specId, + partition); + } + + @Override + public void commit(WriterCommitMessage[] messages) { + PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); + coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + } + + @Override + public void abort(WriterCommitMessage[] messages) { + SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages)); + } + + private List<DeleteFile> files(WriterCommitMessage[] messages) { + List<DeleteFile> files = Lists.newArrayList(); + + for (WriterCommitMessage message : messages) { + if (message != null) { + DeleteTaskCommit taskCommit = (DeleteTaskCommit) message; + files.addAll(Arrays.asList(taskCommit.files())); + } + } + + return files; + } + } + + /** + * Writer factory for position deletes metadata table. Responsible for creating {@link + * DeleteWriter}. + * + * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ + static class PositionDeletesWriterFactory implements DataWriterFactory { + private final Broadcast<Table> tableBroadcast; + private final String queryId; + private final FileFormat format; + private final Long targetFileSize; + private final Schema writeSchema; + private final StructType dsSchema; + private final int specId; + private final StructLike partition; + + PositionDeletesWriterFactory( + Broadcast<Table> tableBroadcast, + String queryId, + FileFormat format, + long targetFileSize, + Schema writeSchema, + StructType dsSchema, + int specId, + StructLike partition) { + this.tableBroadcast = tableBroadcast; + this.queryId = queryId; + this.format = format; + this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; + this.dsSchema = dsSchema; + this.specId = specId; + this.partition = partition; + } + + @Override + public DataWriter<InternalRow> createWriter(int partitionId, long taskId) { + Table table = tableBroadcast.value(); + + OutputFileFactory deleteFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .suffix("deletes") + .build(); + + Schema positionDeleteRowSchema = positionDeleteRowSchema(); + StructType deleteSparkType = deleteSparkType(); + StructType deleteSparkTypeWithoutRow = deleteSparkTypeWithoutRow(); + + SparkFileWriterFactory writerFactoryWithRow = + SparkFileWriterFactory.builderFor(table) + .dataSchema(writeSchema) Review Comment: I am not sure these `dataXXX` methods since we are not writing any data ([here](https://github.com/apache/iceberg/pull/7029#discussion_r1151403977)). ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java: ########## @@ -18,79 +18,23 @@ */ package org.apache.iceberg.spark; -import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import org.apache.iceberg.DataFile; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class FileRewriteCoordinator { +public class FileRewriteCoordinator extends BaseFileRewriteCoordinator<DataFile> { - private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class); private static final FileRewriteCoordinator INSTANCE = new FileRewriteCoordinator(); - private final Map<Pair<String, String>, Set<DataFile>> resultMap = Maps.newConcurrentMap(); - private FileRewriteCoordinator() {} public static FileRewriteCoordinator get() { return INSTANCE; } - /** - * Called to persist the output of a rewrite action for a specific group. Since the write is done - * via a Spark Datasource, we have to propagate the result through this side-effect call. - * - * @param table table where the rewrite is occurring - * @param fileSetID the id used to identify the source set of files being rewritten - * @param newDataFiles the new files which have been written - */ - public void stageRewrite(Table table, String fileSetID, Set<DataFile> newDataFiles) { - LOG.debug( - "Staging the output for {} - fileset {} with {} files", - table.name(), - fileSetID, - newDataFiles.size()); - Pair<String, String> id = toID(table, fileSetID); - resultMap.put(id, newDataFiles); - } - - public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) { - Pair<String, String> id = toID(table, fileSetID); - Set<DataFile> result = resultMap.get(id); - ValidationException.check( - result != null, "No results for rewrite of file set %s in table %s", fileSetID, table); - - return result; - } - - public void clearRewrite(Table table, String fileSetID) { - LOG.debug("Removing entry from RewriteCoordinator for {} - id {}", table.name(), fileSetID); - Pair<String, String> id = toID(table, fileSetID); - resultMap.remove(id); - } - - public Set<String> fetchSetIDs(Table table) { Review Comment: I believe we renamed it to be `fetchSetIds` instead of `fetchSetIDs`. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link + * SparkPositionDeletesRewrite}. + * + * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an + * assumption that all incoming deletes belong to the same partition, and that incoming dataset is + * from {@link ScanTaskSetManager}. + */ +public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo writeInfo; + private final StructType dsSchema; + private final Schema writeSchema; + + SparkPositionDeletesRewriteBuilder( + SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); + this.writeInfo = info; + this.dsSchema = info.schema(); + this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); + } + + @Override + public Write build() { + String fileSetId = writeConf.rewrittenFileSetId(); + + Preconditions.checkArgument( + fileSetId != null, "position_deletes table can only be written by RewriteDeleteFiles"); Review Comment: I don't think there is `RewriteDeleteFiles`. What about a more generic message like `Can only write to %s via actions", table.name()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
