snazy commented on code in PR #3256:
URL: https://github.com/apache/polaris/pull/3256#discussion_r2741673229


##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileOperations.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import jakarta.annotation.Nonnull;
+import java.util.stream.Stream;
+
+/**
+ * Object storage file operations, used to find files below a given prefix, to 
purge files, to
+ * identify referenced files, etc.
+ *
+ * <p>All functions of this interface rather yield incomplete results and 
continue over throwing
+ * exceptions.
+ */
+public interface FileOperations {
+  /**
+   * Find files that match the given prefix and filter.
+   *
+   * <p>Whether existing but inaccessible files are included in the result 
depends on the object
+   * store.
+   *
+   * <p>Call sites should consider rate-limiting the scan operations, for 
example, by using Guava's
+   * {@code RateLimiter} via a {@code Stream.map(x -> { rateLimiter.acquire(); 
return x; }} step on
+   * the returned stream.
+   *
+   * @param prefix full object storage URI prefix, including scheme and bucket.
+   * @param filter file filter
+   * @return a stream of file specs with the {@link 
FileSpec#createdAtMillis()} and {@link
+   *     FileSpec#size()} attributes populated with the information provided 
by the object store.
+   *     The {@link FileSpec#fileType() file type} attribute is not populated, 
it may be {@link
+   *     FileSpec#guessTypeFromName() guessed}.
+   */
+  Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull FileFilter 
filter);
+
+  /**
+   * Identifies all files referenced by the given table-metadata.
+   *
+   * <p>In case "container" files, like the metadata, manifest-list or 
manifest files, are not
+   * readable, the returned stream will just not include those.
+   *
+   * <p>Rate-limiting the returned stream is recommended when identifying 
multiple tables and/or
+   * views. Rate-limiting on a single invocation may not be effective as 
expected.
+   *
+   * @param tableMetadataLocation Iceberg table-metadata location
+   * @param deduplicate if true, attempt to deduplicate files by their 
location, adding additional
+   *     heap pressure to the operation. Implementations may ignore this 
parameter or may not

Review Comment:
   Deduplication is (sadly) needed.
   
   When iterating over a snapshot's manifests, we do not have any information 
whether a specific manifest file has been "seen" before, or whether it has been 
"taken over" from another snapshot.
   
   This means we would be processing the same manifest files multiple times. 
The same is true for data/delete files references from the manifest files.
   
   Note that newer snapshots reference previous manifest files, as returned by 
`Snapshot.allManifests(FileIO)`. Since there is no better way than that 
function, we have to either bite that bullet and re-read the same 
manifest-files multiple times or have a somewhat working deduplication 
mechanism, as implemented. The implemented deduplicator only considers the most 
recent 10,000 manifest file paths now.



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/FileType.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import org.apache.iceberg.ContentFile;
+
+public enum FileType {
+  UNKNOWN(false, false),
+  ICEBERG_METADATA(true, false),
+  ICEBERG_STATISTICS(false, false),

Review Comment:
   The term stems from Icebergs 
`org.apache.iceberg.TableMetadata#statisticsFiles`. I'd prefer to keep that in 
sync.



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.OptionalDouble;
+import java.util.function.Consumer;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+@SuppressWarnings("unused")
+@PolarisImmutable
+public interface PurgeSpec {
+  PurgeSpec DEFAULT_INSTANCE = PurgeSpec.builder().build();
+
+  @Value.Default
+  default FileFilter fileFilter() {
+    return FileFilter.alwaysTrue();

Review Comment:
   The API (and implementation) is not only useful for purge-something 
operations (from the README: `API and implementations to perform long-running 
operations against object stores.`)



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeStats.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import java.time.Duration;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+@PolarisImmutable
+public interface PurgeStats {
+  Duration duration();

Review Comment:
   Docs added to the whole interface.



##########
storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java:
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.impl;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.Streams;
+import com.google.common.util.concurrent.RateLimiter;
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewMetadataParser;
+import org.apache.polaris.storage.files.api.FileFilter;
+import org.apache.polaris.storage.files.api.FileOperations;
+import org.apache.polaris.storage.files.api.FileSpec;
+import org.apache.polaris.storage.files.api.FileType;
+import org.apache.polaris.storage.files.api.ImmutablePurgeStats;
+import org.apache.polaris.storage.files.api.PurgeSpec;
+import org.apache.polaris.storage.files.api.PurgeStats;
+import org.projectnessie.storage.uri.StorageUri;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @param fileIO the {@link FileIO} instance to use. The given instance must 
implement both {@link
+ *     org.apache.iceberg.io.SupportsBulkOperations} and {@link
+ *     org.apache.iceberg.io.SupportsPrefixOperations}.
+ */
+record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileOperationsImpl.class);
+
+  @Override
+  public Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull 
FileFilter filter) {
+    var prefixUri = StorageUri.of(prefix).resolve("/");
+    if (fileIO instanceof SupportsPrefixOperations prefixOps) {
+      return Streams.stream(prefixOps.listPrefix(prefix).iterator())
+          .filter(Objects::nonNull)
+          .map(
+              fileInfo -> {
+                var location = StorageUri.of(fileInfo.location());
+                if (!location.isAbsolute()) {
+                  // ADLSFileIO does _not_ include the prefix, but GCSFileIO 
and S3FileIO do.
+                  location = prefixUri.resolve(location);
+                }
+                return FileSpec.builder()
+                    .location(location.toString())
+                    .size(fileInfo.size())
+                    .createdAtMillis(fileInfo.createdAtMillis())
+                    .build();
+              })
+          .filter(filter);
+    }
+
+    throw new IllegalStateException(
+        format(
+            "An Iceberg FileIO supporting prefix operations is required, but 
the given %s does not",
+            fileIO.getClass().getName()));
+  }
+
+  @Override
+  public Stream<FileSpec> identifyIcebergTableFiles(
+      @Nonnull String tableMetadataLocation, boolean deduplicate) {
+    var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation);
+    if (metadataOpt.isEmpty()) {
+      return Stream.empty();
+    }
+    var metadata = metadataOpt.get();
+
+    var metadataFileSpec =
+        
FileSpec.fromLocation(tableMetadataLocation).fileType(FileType.ICEBERG_METADATA).build();
+
+    var fileSources = new ArrayList<Stream<FileSpec>>();
+
+    fileSources.add(Stream.of(metadataFileSpec));
+
+    var statisticsFiles = metadata.statisticsFiles();
+    if (statisticsFiles != null) {
+      fileSources.addFirst(
+          statisticsFiles.stream()
+              .map(
+                  statisticsFile ->
+                      FileSpec.fromLocationAndSize(
+                              statisticsFile.path(), 
statisticsFile.fileSizeInBytes())
+                          .fileType(FileType.ICEBERG_STATISTICS)
+                          .build()));
+    }
+
+    var previousFiles = metadata.previousFiles();
+    if (previousFiles != null) {
+      fileSources.add(
+          previousFiles.stream()
+              .filter(
+                  metadataLogEntry ->
+                      metadataLogEntry.file() != null && 
!metadataLogEntry.file().isEmpty())
+              .map(
+                  metadataLogEntry ->
+                      FileSpec.fromLocation(metadataLogEntry.file())
+                          .fileType(FileType.ICEBERG_METADATA)
+                          .build()));
+    }
+
+    var specsById = metadata.specsById();
+
+    var addPredicate = deduplicator(deduplicate);
+
+    fileSources.addFirst(
+        metadata.snapshots().stream()
+            // Newest snapshots first
+            .sorted((s1, s2) -> Long.compare(s2.timestampMillis(), 
s1.timestampMillis()))
+            .flatMap(
+                snapshot -> identifyIcebergTableSnapshotFiles(snapshot, 
specsById, addPredicate)));
+
+    // Return "dependencies" before the "metadata" itself, so the probability 
of being able to
+    // resume a failed/aborted purge is higher.
+
+    return fileSources.stream().flatMap(Function.identity());
+  }
+
+  static Predicate<String> deduplicator(boolean deduplicate) {

Review Comment:
   See my other comment, deduplication is sadly needed, simply because we do 
not have the required information is not available.



##########
storage/files/api/src/main/java/org/apache/polaris/storage/files/api/PurgeSpec.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.api;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.OptionalDouble;
+import java.util.function.Consumer;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+@SuppressWarnings("unused")
+@PolarisImmutable
+public interface PurgeSpec {
+  PurgeSpec DEFAULT_INSTANCE = PurgeSpec.builder().build();
+
+  @Value.Default
+  default FileFilter fileFilter() {
+    return FileFilter.alwaysTrue();
+  }
+
+  PurgeSpec withFileFilter(FileFilter fileFilter);
+
+  /**
+   * Delete batch size for purge/batch-deletion operations. Implementations 
may opt to ignore this
+   * parameter and enforce a reasonable or required different limit.
+   */
+  @Value.Default
+  default int deleteBatchSize() {
+    return 250;
+  }
+
+  PurgeSpec withDeleteBatchSize(int deleteBatchSize);
+
+  /**
+   * Callback being invoked right before a file location is being submitted to 
be purged.
+   *
+   * <p>Due to API constraints of {@link
+   * org.apache.iceberg.io.SupportsBulkOperations#deleteFiles(Iterable)} it's 
barely possible to
+   * identify files that failed a deletion.
+   */
+  @Value.Default
+  default Consumer<String> purgeIssuedCallback() {
+    return location -> {};
+  }
+
+  PurgeSpec withPurgeIssuedCallback(Consumer<String> purgeIssuedCallback);
+
+  /**
+   * Optional rate-limit on the number of individual file-deletions per second.

Review Comment:
   True, it (the implementation) could fall-back to individual file-deletions, 
if batch-deletions are not available. But that's a way slower (and more 
money-expensive?).



##########
storage/files/impl/src/main/resources/META-INF/beans.xml:
##########
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee 
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd";>
+    <!-- File required by Weld (used for testing), not by Quarkus -->

Review Comment:
   It is likely going to be used soon.



##########
storage/files/impl/src/main/java/org/apache/polaris/storage/files/impl/FileOperationsImpl.java:
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.impl;
+
+import static java.lang.String.format;
+
+import com.google.common.collect.Streams;
+import com.google.common.util.concurrent.RateLimiter;
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewMetadataParser;
+import org.apache.polaris.storage.files.api.FileFilter;
+import org.apache.polaris.storage.files.api.FileOperations;
+import org.apache.polaris.storage.files.api.FileSpec;
+import org.apache.polaris.storage.files.api.FileType;
+import org.apache.polaris.storage.files.api.ImmutablePurgeStats;
+import org.apache.polaris.storage.files.api.PurgeSpec;
+import org.apache.polaris.storage.files.api.PurgeStats;
+import org.projectnessie.storage.uri.StorageUri;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @param fileIO the {@link FileIO} instance to use. The given instance must 
implement both {@link
+ *     org.apache.iceberg.io.SupportsBulkOperations} and {@link
+ *     org.apache.iceberg.io.SupportsPrefixOperations}.
+ */
+record FileOperationsImpl(@Nonnull FileIO fileIO) implements FileOperations {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileOperationsImpl.class);
+
+  @Override
+  public Stream<FileSpec> findFiles(@Nonnull String prefix, @Nonnull 
FileFilter filter) {
+    var prefixUri = StorageUri.of(prefix).resolve("/");
+    if (fileIO instanceof SupportsPrefixOperations prefixOps) {
+      return Streams.stream(prefixOps.listPrefix(prefix).iterator())
+          .filter(Objects::nonNull)
+          .map(
+              fileInfo -> {
+                var location = StorageUri.of(fileInfo.location());
+                if (!location.isAbsolute()) {
+                  // ADLSFileIO does _not_ include the prefix, but GCSFileIO 
and S3FileIO do.
+                  location = prefixUri.resolve(location);
+                }
+                return FileSpec.builder()
+                    .location(location.toString())
+                    .size(fileInfo.size())
+                    .createdAtMillis(fileInfo.createdAtMillis())
+                    .build();
+              })
+          .filter(filter);
+    }
+
+    throw new IllegalStateException(
+        format(
+            "An Iceberg FileIO supporting prefix operations is required, but 
the given %s does not",
+            fileIO.getClass().getName()));
+  }
+
+  @Override
+  public Stream<FileSpec> identifyIcebergTableFiles(
+      @Nonnull String tableMetadataLocation, boolean deduplicate) {
+    var metadataOpt = readTableMetadataFailsafe(tableMetadataLocation);
+    if (metadataOpt.isEmpty()) {
+      return Stream.empty();
+    }
+    var metadata = metadataOpt.get();
+
+    var metadataFileSpec =
+        
FileSpec.fromLocation(tableMetadataLocation).fileType(FileType.ICEBERG_METADATA).build();
+
+    var fileSources = new ArrayList<Stream<FileSpec>>();
+
+    fileSources.add(Stream.of(metadataFileSpec));
+
+    var statisticsFiles = metadata.statisticsFiles();
+    if (statisticsFiles != null) {
+      fileSources.addFirst(
+          statisticsFiles.stream()
+              .map(
+                  statisticsFile ->
+                      FileSpec.fromLocationAndSize(
+                              statisticsFile.path(), 
statisticsFile.fileSizeInBytes())
+                          .fileType(FileType.ICEBERG_STATISTICS)
+                          .build()));
+    }
+
+    var previousFiles = metadata.previousFiles();
+    if (previousFiles != null) {
+      fileSources.add(
+          previousFiles.stream()
+              .filter(
+                  metadataLogEntry ->
+                      metadataLogEntry.file() != null && 
!metadataLogEntry.file().isEmpty())
+              .map(
+                  metadataLogEntry ->
+                      FileSpec.fromLocation(metadataLogEntry.file())
+                          .fileType(FileType.ICEBERG_METADATA)
+                          .build()));
+    }
+
+    var specsById = metadata.specsById();
+
+    var addPredicate = deduplicator(deduplicate);
+
+    fileSources.addFirst(
+        metadata.snapshots().stream()
+            // Newest snapshots first
+            .sorted((s1, s2) -> Long.compare(s2.timestampMillis(), 
s1.timestampMillis()))
+            .flatMap(
+                snapshot -> identifyIcebergTableSnapshotFiles(snapshot, 
specsById, addPredicate)));
+
+    // Return "dependencies" before the "metadata" itself, so the probability 
of being able to
+    // resume a failed/aborted purge is higher.
+
+    return fileSources.stream().flatMap(Function.identity());
+  }
+
+  static Predicate<String> deduplicator(boolean deduplicate) {
+    if (!deduplicate) {
+      return x -> true;
+    }
+    var set = new LinkedHashSet<String>();
+    return location -> {
+      synchronized (set) {
+        if (set.size() > 100_000) {
+          // limit the heap pressure of the deduplication set to 100,000 
elements
+          set.removeFirst();
+        }
+        return set.add(location);
+      }
+    };
+  }
+
+  Stream<FileSpec> identifyIcebergTableSnapshotFiles(
+      @Nonnull Snapshot snapshot,
+      Map<Integer, PartitionSpec> specsById,
+      Predicate<String> addPredicate) {
+    var manifestListLocation = snapshot.manifestListLocation();
+    if (manifestListLocation != null && 
!addPredicate.test(manifestListLocation)) {
+      return Stream.empty();
+    }
+
+    return identifyIcebergManifests(manifestListLocation, snapshot, specsById, 
addPredicate);
+  }
+
+  Stream<FileSpec> identifyIcebergManifests(
+      String manifestListLocation,
+      Snapshot snapshot,
+      Map<Integer, PartitionSpec> specsById,
+      Predicate<String> addPredicate) {
+
+    var manifestListFileSpecStream = Stream.<FileSpec>empty();
+
+    if (manifestListLocation != null && !manifestListLocation.isEmpty()) {
+      var manifestListFileSpec =
+          FileSpec.fromLocation(manifestListLocation)
+              .fileType(FileType.ICEBERG_MANIFEST_LIST)
+              .build();
+      manifestListFileSpecStream = Stream.of(manifestListFileSpec);
+    }
+
+    try {
+      var allManifestsFiles =
+          snapshot.allManifests(fileIO).stream()
+              .filter(manifestFile -> addPredicate.test(manifestFile.path()))
+              .flatMap(
+                  manifestFile ->
+                      identifyIcebergManifestDataFiles(manifestFile, 
specsById, addPredicate));
+
+      // Return "dependencies" before the "metadata" itself, so a 
failed/aborted purge can be
+      // resumed.
+      return Stream.of(allManifestsFiles, 
manifestListFileSpecStream).flatMap(Function.identity());
+    } catch (Exception e) {
+      LOGGER.warn("Failure reading manifest list file {}: {}", 
manifestListLocation, e.toString());
+      LOGGER.debug("Failure reading manifest list file {}", 
manifestListLocation);
+      return manifestListFileSpecStream;
+    }
+  }
+
+  @SuppressWarnings("UnnecessaryDefault")
+  private Stream<FileSpec> identifyIcebergManifestDataFiles(
+      ManifestFile manifestFile,
+      Map<Integer, PartitionSpec> specsById,
+      Predicate<String> addPredicate) {
+
+    var manifestFileSpec =
+        FileSpec.fromLocationAndSize(manifestFile.path(), 
manifestFile.length())
+            .fileType(FileType.ICEBERG_MANIFEST_FILE)
+            .build();
+
+    try (var contentFilesIter =
+        switch (manifestFile.content()) {
+          case DATA -> ManifestFiles.read(manifestFile, fileIO).iterator();
+          case DELETES ->
+              ManifestFiles.readDeleteManifest(manifestFile, fileIO, 
specsById).iterator();
+          default -> {
+            LOGGER.warn(
+                "Unsupported content type {} in manifest {}",
+                manifestFile.content(),
+                manifestFile.path());
+            yield CloseableIterator.<ContentFile<? extends 
ContentFile<?>>>empty();
+          }
+        }) {
+
+      // Cannot leverage streaming here and eagerly build a list, as the 
manifest-file reader needs
+      // to be closed.
+      var files = new ArrayList<FileSpec>();
+      while (contentFilesIter.hasNext()) {
+        var contentFile = contentFilesIter.next();
+        if (addPredicate.test(contentFile.location())) {
+          files.add(
+              FileSpec.fromLocationAndSize(contentFile.location(), 
contentFile.fileSizeInBytes())
+                  .fileType(FileType.fromContentFile(contentFile))
+                  .build());
+        }
+      }
+      // Return "dependencies" before the "metadata" itself, so the 
probability of being able to
+      // resume a failed/aborted purge is higher.
+      files.add(manifestFileSpec);
+
+      return files.stream();
+    } catch (IOException e) {
+      LOGGER.warn("Failure reading manifest file {}: {}", manifestFile.path(), 
e.toString());
+      LOGGER.debug("Failure reading manifest file {}", manifestFile.path(), e);
+      return Stream.of(manifestFileSpec);
+    }
+  }
+
+  @Override
+  public Stream<FileSpec> identifyIcebergViewFiles(
+      @Nonnull String viewMetadataLocation, boolean deduplicate) {
+    var metadataOpt = readViewMetadataFailsafe(viewMetadataLocation);
+    if (metadataOpt.isEmpty()) {
+      return Stream.empty();
+    }
+
+    var metadataFileSpec =
+        
FileSpec.fromLocation(viewMetadataLocation).fileType(FileType.ICEBERG_METADATA).build();
+
+    return Stream.of(metadataFileSpec);
+  }
+
+  @Override
+  public PurgeStats purgeIcebergTable(@Nonnull String tableMetadataLocation, 
PurgeSpec purgeSpec) {
+    var files =
+        identifyIcebergTableFiles(tableMetadataLocation, 
true).filter(purgeSpec.fileFilter());
+    return purge(files, purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purgeIcebergTableBaseLocation(
+      @Nonnull String tableMetadataLocation, PurgeSpec purgeSpec) {
+    var metadata = readTableMetadataFailsafe(tableMetadataLocation);
+    if (metadata.isEmpty()) {
+      return ImmutablePurgeStats.builder()
+          .duration(Duration.ZERO)
+          .purgedFiles(0L)
+          .failedPurges(1)
+          .build();
+    }
+
+    var baseLocation = metadata.get().location();
+    var files = findFiles(baseLocation, purgeSpec.fileFilter());
+    return purge(files, purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purgeIcebergView(@Nonnull String viewMetadataLocation, 
PurgeSpec purgeSpec) {
+    var files =
+        identifyIcebergViewFiles(viewMetadataLocation, 
false).filter(purgeSpec.fileFilter());
+    return purge(files, purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purgeIcebergViewBaseLocation(
+      @Nonnull String viewMetadataLocation, PurgeSpec purgeSpec) {
+    var metadata = readViewMetadataFailsafe(viewMetadataLocation);
+    if (metadata.isEmpty()) {
+      return ImmutablePurgeStats.builder()
+          .duration(Duration.ZERO)
+          .purgedFiles(0L)
+          .failedPurges(1)
+          .build();
+    }
+
+    var baseLocation = metadata.get().location();
+    var files = findFiles(baseLocation, purgeSpec.fileFilter());
+    return purge(files, purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purge(@Nonnull Stream<FileSpec> locationStream, PurgeSpec 
purgeSpec) {
+    return purgeFiles(locationStream.map(FileSpec::location), purgeSpec);
+  }
+
+  @Override
+  public PurgeStats purgeFiles(@Nonnull Stream<String> locationStream, 
PurgeSpec purgeSpec) {
+    if (fileIO instanceof SupportsBulkOperations bulkOps) {
+      var startedNanos = System.nanoTime();
+
+      var iter = locationStream.iterator();
+
+      var batcher = new PurgeBatcher(purgeSpec, bulkOps);
+      while (iter.hasNext()) {
+        batcher.add(iter.next());
+      }
+      batcher.flush();
+
+      return ImmutablePurgeStats.builder()
+          .purgedFiles(batcher.purged)
+          .failedPurges(batcher.failed)
+          .duration(Duration.ofNanos(System.nanoTime() - startedNanos))
+          .build();
+    }
+
+    throw new IllegalStateException(
+        format(
+            "An Iceberg FileIO supporting bulk operations is required, but the 
given %s does not",
+            fileIO.getClass().getName()));
+  }
+
+  @SuppressWarnings("UnstableApiUsage")
+  static final class PurgeBatcher {
+    private final PurgeSpec purgeSpec;
+    private final SupportsBulkOperations bulkOps;
+
+    private final int deleteBatchSize;
+    // Using a `Set` prevents duplicate paths in a single bulk-deletion.
+
+    private final Set<String> batch = new HashSet<>();
+
+    private final Runnable fileDeleteRateLimiter;
+    private final Runnable batchDeleteRateLimiter;
+
+    long purged = 0L;
+
+    long failed = 0L;
+
+    PurgeBatcher(PurgeSpec purgeSpec, SupportsBulkOperations bulkOps) {
+      var implSpecificLimit = implSpecificDeleteBatchLimit(bulkOps);
+
+      this.deleteBatchSize = Math.min(implSpecificLimit, 
Math.max(purgeSpec.deleteBatchSize(), 1));
+
+      this.purgeSpec = purgeSpec;
+      this.bulkOps = bulkOps;
+
+      fileDeleteRateLimiter = createLimiter(purgeSpec.fileDeletesPerSecond());
+      batchDeleteRateLimiter = 
createLimiter(purgeSpec.batchDeletesPerSecond());
+    }
+
+    private static Runnable createLimiter(OptionalDouble optionalDouble) {
+      if (optionalDouble.isEmpty()) {
+        // unlimited
+        return () -> {};
+      }
+      var limiter = RateLimiter.create(optionalDouble.getAsDouble());
+      return limiter::acquire;
+    }
+
+    void add(String location) {
+      fileDeleteRateLimiter.run();
+      batch.add(location);
+
+      if (batch.size() >= deleteBatchSize) {
+        flush();
+      }
+    }
+
+    void flush() {
+      int size = batch.size();
+      if (size > 0) {
+        batch.forEach(purgeSpec.purgeIssuedCallback());
+        try {
+          batchDeleteRateLimiter.run();
+          bulkOps.deleteFiles(batch);
+          purged += size;
+        } catch (BulkDeletionFailureException e) {
+          // Object stores do delete the files that exist, but a 
BulkDeletionFailureException is
+          // still being thrown.
+          // However, not all FileIO implementations behave the same way as 
some don't throw in the
+          // non-existent-case.

Review Comment:
   This is another reason why the implementaiton cannot yield correct values 
for number of files (not) purged.



##########
storage/files/impl/src/testFixtures/java/org/apache/polaris/storage/files/impl/IcebergFixtures.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.storage.files.impl;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.IcebergBridge;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.types.Types;
+import org.projectnessie.catalog.formats.iceberg.IcebergSpec;
+import org.projectnessie.catalog.formats.iceberg.manifest.IcebergDataContent;
+import org.projectnessie.catalog.formats.iceberg.manifest.IcebergDataFile;
+import org.projectnessie.catalog.formats.iceberg.manifest.IcebergFileFormat;
+import 
org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestContent;
+import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestEntry;
+import 
org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestEntryStatus;
+import org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestFile;
+import 
org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestFileWriter;
+import 
org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestFileWriterSpec;
+import 
org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestListWriter;
+import 
org.projectnessie.catalog.formats.iceberg.manifest.IcebergManifestListWriterSpec;
+import org.projectnessie.catalog.formats.iceberg.meta.IcebergPartitionSpec;
+import org.projectnessie.catalog.formats.iceberg.meta.IcebergSchema;
+
+public class IcebergFixtures {
+  public final Schema schema;
+  public final IcebergSchema nessieIcebergSchema;
+  public final PartitionSpec spec;
+  public final TableMetadata tableMetadata;
+  public final String tableMetadataString;
+  public final byte[] tableMetadataBytes;
+
+  public final String prefix;
+  public final int numSnapshots;
+  public final int numManifestFiles;
+  public final int numDataFiles;
+
+  public IcebergFixtures(String prefix, int numSnapshots, int 
numManifestFiles, int numDataFiles) {
+    this.prefix = prefix;
+    this.numSnapshots = numSnapshots;
+    this.numManifestFiles = numManifestFiles;
+    this.numDataFiles = numDataFiles;
+
+    schema = new Schema(1, Types.NestedField.required(1, "foo", 
Types.StringType.get()));
+    try {
+      nessieIcebergSchema =
+          new ObjectMapper().readValue(SchemaParser.toJson(schema), 
IcebergSchema.class);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+    spec = PartitionSpec.unpartitioned();
+
+    var tableMetadataBuilder =
+        TableMetadata.buildFrom(
+            TableMetadata.newTableMetadata(schema, spec, prefix, 
Map.of()).withUUID());
+    for (var snapshotId = 1; snapshotId <= numSnapshots; snapshotId++) {
+      var manifestList = manifestListPath(snapshotId);
+      var snapshot =
+          IcebergBridge.mockSnapshot(
+              snapshotId + 1,
+              snapshotId + 1,
+              snapshotId > 0 ? (long) snapshotId : null,
+              System.currentTimeMillis(),
+              "APPEND",
+              Map.of(),
+              schema.schemaId(),
+              manifestList,
+              (long) numManifestFiles * numManifestFiles);
+      tableMetadataBuilder.addSnapshot(snapshot);
+    }
+    tableMetadata = tableMetadataBuilder.build();
+
+    tableMetadataString = TableMetadataParser.toJson(tableMetadata);
+    tableMetadataBytes = tableMetadataString.getBytes(UTF_8);
+  }
+
+  public String manifestListPath(int snapshotId) {
+    return format("%s%05d/snap-%d.avro", prefix, snapshotId, snapshotId);
+  }
+
+  public byte[] serializedManifestList(long snapshotId) {
+    var output = new ByteArrayOutputStream();
+    try (var manifestListWriter =
+        IcebergManifestListWriter.openManifestListWriter(
+            IcebergManifestListWriterSpec.builder()
+                .snapshotId(snapshotId)
+                .sequenceNumber(snapshotId)
+                .parentSnapshotId(snapshotId > 0 ? snapshotId - 1 : null)
+                .partitionSpec(IcebergPartitionSpec.UNPARTITIONED_SPEC)
+                .spec(IcebergSpec.V2)
+                .schema(nessieIcebergSchema)
+                .build(),
+            output)) {
+      for (int i = 0; i < numManifestFiles; i++) {
+        var manifestPath = manifestFilePath(snapshotId, i);

Review Comment:
   Good point.
   I think, we should revisit the implementation and tests esp when V3 is 
available here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to