gaborkaszab commented on code in PR #12106: URL: https://github.com/apache/iceberg/pull/12106#discussion_r1931803646
########## api/src/main/java/org/apache/iceberg/Table.java: ########## @@ -276,6 +276,17 @@ default AppendFiles newFastAppend() { */ DeleteFiles newDelete(); + /** + * Create a new {@link RemoveMissingFiles remove files API} to remove files in this table and + * commit. + * + * @return a new {@link RemoveMissingFiles} + */ + default RemoveMissingFiles newRemoveFiles() { Review Comment: The name newRemoveFiles() might be confusing for users because there is already a newDelete() and I'm not sure how clear what the difference is. ########## api/src/main/java/org/apache/iceberg/RemoveMissingFiles.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** + * API for deleting files from a table. This is intended for use in removing missing files, both + * {@link DataFile data files} and {@link DeleteFile delete files}. + * + * <p>This API accumulates file deletions, produces a new {@link Snapshot} of the table, and commits + * that snapshot as the current. + * + * <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts + * will be resolved by applying the changes to the new latest snapshot and reattempting the commit. + */ +public interface RemoveMissingFiles extends SnapshotUpdate<RemoveMissingFiles> { Review Comment: Initially I didn't understand why you needed this new API and why you didn't use the existing DeleteFiles API. I think I get the motivation is. Is this because DeleteFiles can only delete data files and not delete files? Even now understanding the motivation I feel confused about this new API for the following reasons: 1. It partially overlaps with the DeleteFiles API. I mean the part when we delete data files. 2. The name might be misleading: it says "MissingFiles" but in fact it does nothing special wrt the files being missing or not. It just removes files from the table metadata. 3. Not sure how one could use this API without using the new Spark action. (see my comment below). I probably miss enough experience on this area but these might be some options we have here (just throwing some ideas): - If the purpose of this new API is to provide ability to remove DeleteFiles then we might want to re-visit the existing DeleteFiles API to see if we can extend it. There probably is a reason why it doesn't give support for removing DeleteFiles, but would be nice to understand. - If DeleteFiles API can't be changed to this purpose, then in order to use this API without the Spark action I think a single function is enough having a path parameter. DeleteFiles has the same too, we 'just' have to improve it to take care of delete files too if possible. With this approach the name of this API won't be correct, because it has nothing to do with missing files. - This API could be smarter than just simply removing files from the table. Since it's called RemoveMissingFiles, it could also do the detection/collection of such missing files and then remove them. With this approach a 'RecoverTable' class/interface name might be more verbose having a 'removeMissingContentFiles' function. - With the above approach, later on we would have the API to add further recovery functions to recover from missing metadata.jsons, or missing manifest or snapshot files. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveMissingFilesSparkAction.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.ImmutableRemoveMissingFiles; +import org.apache.iceberg.actions.RemoveMissingFiles; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.spark.SparkDataFile; +import org.apache.iceberg.spark.SparkDeleteFile; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; + +public class RemoveMissingFilesSparkAction + extends BaseSnapshotUpdateSparkAction<RemoveMissingFilesSparkAction> + implements RemoveMissingFiles { + + private final Table table; + + RemoveMissingFilesSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + } + + @Override + protected RemoveMissingFilesSparkAction self() { + return this; + } + + @Override + public RemoveMissingFiles.Result execute() { + String jobDesc = String.format("Removing missing files from %s", table.name()); + JobGroupInfo info = newJobGroupInfo("REMOVE-MISSING-FILES", jobDesc); + return withJobGroupInfo(info, this::doExecute); + } + + private RemoveMissingFiles.Result doExecute() { + org.apache.iceberg.RemoveMissingFiles rmf = table.newRemoveFiles(); + + Dataset<Row> entries = loadMetadataTable(table, MetadataTableType.ENTRIES); + Dataset<Row> dataEntries = + entries.filter("data_file.content = 0 AND status < 2").select("data_file.*"); + Dataset<Row> deleteEntries = + entries.filter("data_file.content != 0 AND status < 2").select("data_file.*"); + + List<DataFile> dataFiles = + dataEntries.collectAsList().stream() + .map(row -> dataFileWrapper(dataEntries.schema(), row)) + .collect(Collectors.toList()); + List<DeleteFile> deleteFiles = + deleteEntries.collectAsList().stream() + .map(row -> deleteFileWrapper(deleteEntries.schema(), row)) + .collect(Collectors.toList()); + + FileIO fileIO = table.io(); + List<String> removedDataFiles = Lists.newArrayList(); + List<String> removedDeleteFiles = Lists.newArrayList(); + + for (DataFile f : dataFiles) { + if (!fileIO.newInputFile(f.location()).exists()) { Review Comment: These sequentially do an existence check on the storage for each of the data/delete files in the table. Not sure about how Spark actions do these in general (lack of experience), but I saw in various parts of the code that such storage operations on high volumes are performed using an ExecutorService in a parallel manner. Do you think it would make sense adding some parallelism here? I'm wondering how a sequential effort performs in general on 10k, 100k etc number of files. If the time for these checks is negligible, then it's fine as it is. However, in object stores, this might be slow, not sure. ########## api/src/main/java/org/apache/iceberg/RemoveMissingFiles.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** + * API for deleting files from a table. This is intended for use in removing missing files, both + * {@link DataFile data files} and {@link DeleteFile delete files}. + * + * <p>This API accumulates file deletions, produces a new {@link Snapshot} of the table, and commits + * that snapshot as the current. + * + * <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts + * will be resolved by applying the changes to the new latest snapshot and reattempting the commit. + */ +public interface RemoveMissingFiles extends SnapshotUpdate<RemoveMissingFiles> { + /** + * Delete a file tracked by a {@link DataFile} from the underlying table. + * + * @param file a DataFile to remove from the table + * @return this for method chaining + */ + RemoveMissingFiles deleteFile(DataFile file); + + /** + * Delete a file tracked by a {@link DeleteFile} from the underlying table. + * + * @param file a DeleteFile to remove from the table + * @return this for method chaining + */ + RemoveMissingFiles deleteFile(DeleteFile file); + + /** + * Enables validation that any files that are part of the deletion still exist when committing the + * operation. + * + * @return this for method chaining + */ + RemoveMissingFiles validateFilesExist(); Review Comment: The purpose of this API is to drop missing files. I don't think it makes sense to validate existence of such files. ########## api/src/main/java/org/apache/iceberg/RemoveMissingFiles.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** + * API for deleting files from a table. This is intended for use in removing missing files, both + * {@link DataFile data files} and {@link DeleteFile delete files}. + * + * <p>This API accumulates file deletions, produces a new {@link Snapshot} of the table, and commits + * that snapshot as the current. + * + * <p>When committing, these changes will be applied to the latest table snapshot. Commit conflicts + * will be resolved by applying the changes to the new latest snapshot and reattempting the commit. + */ +public interface RemoveMissingFiles extends SnapshotUpdate<RemoveMissingFiles> { + /** + * Delete a file tracked by a {@link DataFile} from the underlying table. + * + * @param file a DataFile to remove from the table + * @return this for method chaining + */ + RemoveMissingFiles deleteFile(DataFile file); Review Comment: I'm thinking of how this interface would be used if not from the introduced Spark action. For instance when some user sees that their table can't be loaded due to a missing file, how would they use this API to fix the table? (Assuming they don't have Spark but they can use the Java API)? They know a file path from the error message, but should they then figure out if it's a data or delete file? It's probably possible but requires an extra manual step. And then they have to create a DataFile/DeleteFile object somehow so that they can call this API. This seems more problematic, and adds another manual step. I think if users want to use this API to fix tables but not using the Spark action, then the API should be rather something like this: `RemoveMissingFiles deleteFile(CharSequence path);` Similarly to what DeleteFiles does. ########## core/src/main/java/org/apache/iceberg/BaseRemoveFiles.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** {@link RemoveMissingFiles} implementation. */ +public class BaseRemoveFiles extends MergingSnapshotProducer<RemoveMissingFiles> + implements RemoveMissingFiles { + private boolean validateFilesToDeleteExist = false; + + protected BaseRemoveFiles(String tableName, TableOperations ops) { + super(tableName, ops); + } + + @Override + protected RemoveMissingFiles self() { + return this; + } + + @Override + protected String operation() { + if (!deletesDeleteFiles()) { + return DataOperations.DELETE; + } + + return DataOperations.OVERWRITE; Review Comment: Just for my understanding: Why is the operation an OVERWRITE if we delete DeleteFiles? Is it because there are going to be rows that were meant to be deleted that might re-appear again? I checked the existing options and I don't think any of them applies comfortable to what we try to do here. Maybe introducing a new `RECOVER` or something? Well, another dilemma here is that is this class for purely removing files in general, or does it have anything to do with missing files and table recovery. ########## core/src/main/java/org/apache/iceberg/BaseRemoveFiles.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** {@link RemoveMissingFiles} implementation. */ +public class BaseRemoveFiles extends MergingSnapshotProducer<RemoveMissingFiles> Review Comment: The parent class is RemoveMissingFiles while this one is BaseRemoveFiles. In the name we loose the information that this has something to do with missing files. I feel the reason for this is that this patch can't decide either if this new interface is for handling missing files and table recovery in general, or it is only introduced because the existing DeleteFiles API doesn't remove DeleteFiles. ########## core/src/main/java/org/apache/iceberg/BaseRemoveFiles.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** {@link RemoveMissingFiles} implementation. */ +public class BaseRemoveFiles extends MergingSnapshotProducer<RemoveMissingFiles> Review Comment: I find a huge overlap between this class and StreamingDelete. I'm wondering if we can somehow avoid code duplication. Inheritance maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org