gaborkaszab commented on code in PR #12106:
URL: https://github.com/apache/iceberg/pull/12106#discussion_r1931803646


##########
api/src/main/java/org/apache/iceberg/Table.java:
##########
@@ -276,6 +276,17 @@ default AppendFiles newFastAppend() {
    */
   DeleteFiles newDelete();
 
+  /**
+   * Create a new {@link RemoveMissingFiles remove files API} to remove files 
in this table and
+   * commit.
+   *
+   * @return a new {@link RemoveMissingFiles}
+   */
+  default RemoveMissingFiles newRemoveFiles() {

Review Comment:
   The name newRemoveFiles() might be confusing for users because there is 
already a newDelete() and I'm not sure how clear what the difference is.



##########
api/src/main/java/org/apache/iceberg/RemoveMissingFiles.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/**
+ * API for deleting files from a table. This is intended for use in removing 
missing files, both
+ * {@link DataFile data files} and {@link DeleteFile delete files}.
+ *
+ * <p>This API accumulates file deletions, produces a new {@link Snapshot} of 
the table, and commits
+ * that snapshot as the current.
+ *
+ * <p>When committing, these changes will be applied to the latest table 
snapshot. Commit conflicts
+ * will be resolved by applying the changes to the new latest snapshot and 
reattempting the commit.
+ */
+public interface RemoveMissingFiles extends SnapshotUpdate<RemoveMissingFiles> 
{

Review Comment:
   Initially I didn't understand why you needed this new API and why you didn't 
use the existing DeleteFiles API. I think I get the motivation is. Is this 
because DeleteFiles can only delete data files and not delete files?
   
   Even now understanding the motivation I feel confused about this new API for 
the following reasons:
   1. It partially overlaps with the DeleteFiles API. I mean the part when we 
delete data files.
   2. The name might be misleading: it says "MissingFiles" but in fact it does 
nothing special wrt the files being missing or not. It just removes files from 
the table metadata.
   3. Not sure how one could use this API without using the new Spark action. 
(see my comment below).
   
   I probably miss enough experience on this area but these might be some 
options we have here (just throwing some ideas):
   - If the purpose of this new API is to provide ability to remove DeleteFiles 
then we might want to re-visit the existing DeleteFiles API to see if we can 
extend it. There probably is a reason why it doesn't give support for removing 
DeleteFiles, but would be nice to understand.
   - If DeleteFiles API can't be changed to this purpose, then in order to use 
this API without the Spark action I think a single function is enough having a 
path parameter. DeleteFiles has the same too, we 'just' have to improve it to 
take care of delete files too if possible. With this approach the name of this 
API won't be correct, because it has nothing to do with missing files.
   - This API could be smarter than just simply removing files from the table. 
Since it's called RemoveMissingFiles, it could also do the detection/collection 
of such missing files and then remove them. With this approach a 'RecoverTable' 
class/interface name might be more verbose having a 'removeMissingContentFiles' 
function.
   - With the above approach, later on we would have the API to add further 
recovery functions to recover from missing metadata.jsons, or missing manifest 
or snapshot files.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveMissingFilesSparkAction.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ImmutableRemoveMissingFiles;
+import org.apache.iceberg.actions.RemoveMissingFiles;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkDeleteFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+
+public class RemoveMissingFilesSparkAction
+    extends BaseSnapshotUpdateSparkAction<RemoveMissingFilesSparkAction>
+    implements RemoveMissingFiles {
+
+  private final Table table;
+
+  RemoveMissingFilesSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  protected RemoveMissingFilesSparkAction self() {
+    return this;
+  }
+
+  @Override
+  public RemoveMissingFiles.Result execute() {
+    String jobDesc = String.format("Removing missing files from %s", 
table.name());
+    JobGroupInfo info = newJobGroupInfo("REMOVE-MISSING-FILES", jobDesc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private RemoveMissingFiles.Result doExecute() {
+    org.apache.iceberg.RemoveMissingFiles rmf = table.newRemoveFiles();
+
+    Dataset<Row> entries = loadMetadataTable(table, MetadataTableType.ENTRIES);
+    Dataset<Row> dataEntries =
+        entries.filter("data_file.content = 0 AND status < 
2").select("data_file.*");
+    Dataset<Row> deleteEntries =
+        entries.filter("data_file.content != 0 AND status < 
2").select("data_file.*");
+
+    List<DataFile> dataFiles =
+        dataEntries.collectAsList().stream()
+            .map(row -> dataFileWrapper(dataEntries.schema(), row))
+            .collect(Collectors.toList());
+    List<DeleteFile> deleteFiles =
+        deleteEntries.collectAsList().stream()
+            .map(row -> deleteFileWrapper(deleteEntries.schema(), row))
+            .collect(Collectors.toList());
+
+    FileIO fileIO = table.io();
+    List<String> removedDataFiles = Lists.newArrayList();
+    List<String> removedDeleteFiles = Lists.newArrayList();
+
+    for (DataFile f : dataFiles) {
+      if (!fileIO.newInputFile(f.location()).exists()) {

Review Comment:
   These sequentially do an existence check on the storage for each of the 
data/delete files in the table. Not sure about how Spark actions do these in 
general (lack of experience), but I saw in various parts of the code that such 
storage operations on high volumes are performed using an ExecutorService in a 
parallel manner. Do you think it would make sense adding some parallelism here?
   I'm wondering how a sequential effort performs in general on 10k, 100k etc 
number of files. If the time for these checks is negligible, then it's fine as 
it is. However, in object stores, this might be slow, not sure.



##########
api/src/main/java/org/apache/iceberg/RemoveMissingFiles.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/**
+ * API for deleting files from a table. This is intended for use in removing 
missing files, both
+ * {@link DataFile data files} and {@link DeleteFile delete files}.
+ *
+ * <p>This API accumulates file deletions, produces a new {@link Snapshot} of 
the table, and commits
+ * that snapshot as the current.
+ *
+ * <p>When committing, these changes will be applied to the latest table 
snapshot. Commit conflicts
+ * will be resolved by applying the changes to the new latest snapshot and 
reattempting the commit.
+ */
+public interface RemoveMissingFiles extends SnapshotUpdate<RemoveMissingFiles> 
{
+  /**
+   * Delete a file tracked by a {@link DataFile} from the underlying table.
+   *
+   * @param file a DataFile to remove from the table
+   * @return this for method chaining
+   */
+  RemoveMissingFiles deleteFile(DataFile file);
+
+  /**
+   * Delete a file tracked by a {@link DeleteFile} from the underlying table.
+   *
+   * @param file a DeleteFile to remove from the table
+   * @return this for method chaining
+   */
+  RemoveMissingFiles deleteFile(DeleteFile file);
+
+  /**
+   * Enables validation that any files that are part of the deletion still 
exist when committing the
+   * operation.
+   *
+   * @return this for method chaining
+   */
+  RemoveMissingFiles validateFilesExist();

Review Comment:
   The purpose of this API is to drop missing files. I don't think it makes 
sense to validate existence of such files.



##########
api/src/main/java/org/apache/iceberg/RemoveMissingFiles.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/**
+ * API for deleting files from a table. This is intended for use in removing 
missing files, both
+ * {@link DataFile data files} and {@link DeleteFile delete files}.
+ *
+ * <p>This API accumulates file deletions, produces a new {@link Snapshot} of 
the table, and commits
+ * that snapshot as the current.
+ *
+ * <p>When committing, these changes will be applied to the latest table 
snapshot. Commit conflicts
+ * will be resolved by applying the changes to the new latest snapshot and 
reattempting the commit.
+ */
+public interface RemoveMissingFiles extends SnapshotUpdate<RemoveMissingFiles> 
{
+  /**
+   * Delete a file tracked by a {@link DataFile} from the underlying table.
+   *
+   * @param file a DataFile to remove from the table
+   * @return this for method chaining
+   */
+  RemoveMissingFiles deleteFile(DataFile file);

Review Comment:
   I'm thinking of how this interface would be used if not from the introduced 
Spark action. For instance when some user sees that their table can't be loaded 
due to a missing file, how would they use this API to fix the table? (Assuming 
they don't have Spark but they can use the Java API)?
   
   They know a file path from the error message, but should they then figure 
out if it's a data or delete file? It's probably possible but requires an extra 
manual step. And then they have to create a DataFile/DeleteFile object somehow 
so that they can call this API. This seems more problematic, and adds another 
manual step.
   
   I think if users want to use this API to fix tables but not using the Spark 
action, then the API should be rather something like this:
   `RemoveMissingFiles deleteFile(CharSequence path);`
   Similarly to what DeleteFiles does.



##########
core/src/main/java/org/apache/iceberg/BaseRemoveFiles.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** {@link RemoveMissingFiles} implementation. */
+public class BaseRemoveFiles extends 
MergingSnapshotProducer<RemoveMissingFiles>
+    implements RemoveMissingFiles {
+  private boolean validateFilesToDeleteExist = false;
+
+  protected BaseRemoveFiles(String tableName, TableOperations ops) {
+    super(tableName, ops);
+  }
+
+  @Override
+  protected RemoveMissingFiles self() {
+    return this;
+  }
+
+  @Override
+  protected String operation() {
+    if (!deletesDeleteFiles()) {
+      return DataOperations.DELETE;
+    }
+
+    return DataOperations.OVERWRITE;

Review Comment:
   Just for my understanding: Why is the operation an OVERWRITE if we delete 
DeleteFiles? Is it because there are going to be rows that were meant to be 
deleted that might re-appear again?
   I checked the existing options and I don't think any of them applies 
comfortable to what we try to do here. Maybe introducing a new `RECOVER` or 
something?
   Well, another dilemma here is that is this class for purely removing files 
in general, or does it have anything to do with missing files and table 
recovery.



##########
core/src/main/java/org/apache/iceberg/BaseRemoveFiles.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** {@link RemoveMissingFiles} implementation. */
+public class BaseRemoveFiles extends 
MergingSnapshotProducer<RemoveMissingFiles>

Review Comment:
   The parent class is RemoveMissingFiles while this one is BaseRemoveFiles. In 
the name we loose the information that this has something to do with missing 
files.
   I feel the reason for this is that this patch can't decide either if this 
new interface is for handling missing files and table recovery in general, or 
it is only introduced because the existing DeleteFiles API doesn't remove 
DeleteFiles.



##########
core/src/main/java/org/apache/iceberg/BaseRemoveFiles.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** {@link RemoveMissingFiles} implementation. */
+public class BaseRemoveFiles extends 
MergingSnapshotProducer<RemoveMissingFiles>

Review Comment:
   I find a huge overlap between this class and StreamingDelete. I'm wondering 
if we can somehow avoid code duplication. Inheritance maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to