ldudas-marx commented on code in PR #15436: URL: https://github.com/apache/iceberg/pull/15436#discussion_r2960757371
########## core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Contains references to the hadoop bulk delete API; It will not be available on hadoop 3.3.x + * runtimes. + */ +final class BulkDeleter { + + private static final Logger LOG = LoggerFactory.getLogger(BulkDeleter.class); + + /** Resource looked for as an availability probe: {@value}. */ + private static final String BULK_DELETE_CLASS = "org/apache/hadoop/fs/BulkDelete.class"; + + /** Thread pool for deletions. */ + private final ExecutorService executorService; + + /** Configuration for filesystems to retrieve. */ + private final Configuration conf; + + /** + * Constructor. + * + * @param executorService pool for executing bulk delete in parallel + * @param conf hadoop configuration used to instantiate filesystems. + */ + BulkDeleter(ExecutorService executorService, Configuration conf) { + this.executorService = executorService; + this.conf = conf; + } + + /** + * Is the bulk delete API available? + * + * @return true if the bulk delete interface class is on the classpath. + */ + public static boolean apiAvailable() { + return BulkDeleter.class.getClassLoader().getResource(BULK_DELETE_CLASS) != null; + } + + /** + * Bulk delete files. + * + * <p>When implemented in the hadoop filesystem APIs, all filesystems support a bulk delete of a + * page size of at least one. On S3A a larger bulk delete operation is supported, with the page + * size set by {@code fs.s3a.bulk.delete.page.size}. + * + * <p>A page of paths to delete is built up for each filesystem; when the page size is reached a + * bulk delete is submitted for execution in a separate thread. + * + * @param pathnames paths to delete. + * @return count of failures. + * @throws UncheckedIOException if an IOE was raised in the invoked methods. + * @throws RuntimeException if interrupted while waiting for deletions to complete. + */ + public int bulkDeleteFiles(Iterable<String> pathnames) { + + LOG.debug("Using bulk delete operation to delete files"); + + // Deletion context for each filesystem, using the root path as lookup. + Map<Path, DeleteContext> contextMap = Maps.newHashMap(); + + // List of ongoing deletion tasks. + List<Future<Outcome>> deletionTasks = Lists.newArrayList(); + + int totalFailedDeletions = 0; + + try { + for (String name : pathnames) { + Path target = new Path(name); + final FileSystem fs; + try { + fs = target.getFileSystem(conf); + } catch (Exception e) { + // any failure to find/load a filesystem + LOG.info("Failed to get filesystem for path: {}; unable to delete it", target, e); + totalFailedDeletions++; + continue; + } + // build root path of the filesystem, Review Comment: This comment could explain better what is the purpose of this path ########## core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Contains references to the hadoop bulk delete API; It will not be available on hadoop 3.3.x + * runtimes. + */ +final class BulkDeleter { + + private static final Logger LOG = LoggerFactory.getLogger(BulkDeleter.class); + + /** Resource looked for as an availability probe: {@value}. */ + private static final String BULK_DELETE_CLASS = "org/apache/hadoop/fs/BulkDelete.class"; + + /** Thread pool for deletions. */ + private final ExecutorService executorService; + + /** Configuration for filesystems to retrieve. */ + private final Configuration conf; + + /** + * Constructor. + * + * @param executorService pool for executing bulk delete in parallel + * @param conf hadoop configuration used to instantiate filesystems. + */ + BulkDeleter(ExecutorService executorService, Configuration conf) { + this.executorService = executorService; + this.conf = conf; + } + + /** + * Is the bulk delete API available? + * + * @return true if the bulk delete interface class is on the classpath. + */ + public static boolean apiAvailable() { + return BulkDeleter.class.getClassLoader().getResource(BULK_DELETE_CLASS) != null; + } + + /** + * Bulk delete files. + * + * <p>When implemented in the hadoop filesystem APIs, all filesystems support a bulk delete of a + * page size of at least one. On S3A a larger bulk delete operation is supported, with the page + * size set by {@code fs.s3a.bulk.delete.page.size}. + * + * <p>A page of paths to delete is built up for each filesystem; when the page size is reached a + * bulk delete is submitted for execution in a separate thread. + * + * @param pathnames paths to delete. + * @return count of failures. + * @throws UncheckedIOException if an IOE was raised in the invoked methods. + * @throws RuntimeException if interrupted while waiting for deletions to complete. + */ + public int bulkDeleteFiles(Iterable<String> pathnames) { + + LOG.debug("Using bulk delete operation to delete files"); + + // Deletion context for each filesystem, using the root path as lookup. + Map<Path, DeleteContext> contextMap = Maps.newHashMap(); + + // List of ongoing deletion tasks. + List<Future<Outcome>> deletionTasks = Lists.newArrayList(); + + int totalFailedDeletions = 0; + + try { + for (String name : pathnames) { + Path target = new Path(name); + final FileSystem fs; + try { + fs = target.getFileSystem(conf); + } catch (Exception e) { + // any failure to find/load a filesystem + LOG.info("Failed to get filesystem for path: {}; unable to delete it", target, e); + totalFailedDeletions++; + continue; + } + // build root path of the filesystem, + Path fsRoot = fs.makeQualified(new Path("/")); + DeleteContext dc = contextMap.get(fsRoot); + if (dc == null) { + // fs root is not in the map, so create the bulk delete operation for + // that FS and store within a new delete context. + dc = new DeleteContext(fsRoot, fs.createBulkDelete(fsRoot)); + contextMap.put(fsRoot, dc); + } + + // make final for the closure use. + final DeleteContext deleteContext = dc; + + // add the deletion target. + deleteContext.add(target); + + if (deleteContext.pageSizeReached()) { + // the page size has been reached. + // get the live path list, which MUST be done outside the async + // submitted closure. This also resets the context list to prepare + // for more entries. + final Collection<Path> paths = deleteContext.snapshotDeletedFiles(); + // execute the bulk delete in a new thread. + deletionTasks.add(executorService.submit(() -> deleteContext.deleteBatch(paths))); + } + } + + // End of the iteration. Submit deletion batches for all + // entries in the map which haven't yet reached their page size + contextMap.values().stream() + .filter(sd -> !sd.isEmpty()) + .map(sd -> executorService.submit(() -> sd.deleteBatch(sd.deletedFiles()))) Review Comment: `getDeletionTask()` can be also used here. ########## core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Contains references to the hadoop bulk delete API; It will not be available on hadoop 3.3.x + * runtimes. + */ +final class BulkDeleter { + + private static final Logger LOG = LoggerFactory.getLogger(BulkDeleter.class); + + /** Resource looked for as an availability probe: {@value}. */ + private static final String BULK_DELETE_CLASS = "org/apache/hadoop/fs/BulkDelete.class"; + + /** Thread pool for deletions. */ + private final ExecutorService executorService; + + /** Configuration for filesystems to retrieve. */ + private final Configuration conf; + + /** + * Constructor. + * + * @param executorService pool for executing bulk delete in parallel + * @param conf hadoop configuration used to instantiate filesystems. + */ + BulkDeleter(ExecutorService executorService, Configuration conf) { + this.executorService = executorService; + this.conf = conf; + } + + /** + * Is the bulk delete API available? + * + * @return true if the bulk delete interface class is on the classpath. + */ + public static boolean apiAvailable() { + return BulkDeleter.class.getClassLoader().getResource(BULK_DELETE_CLASS) != null; + } + + /** + * Bulk delete files. + * + * <p>When implemented in the hadoop filesystem APIs, all filesystems support a bulk delete of a + * page size of at least one. On S3A a larger bulk delete operation is supported, with the page + * size set by {@code fs.s3a.bulk.delete.page.size}. + * + * <p>A page of paths to delete is built up for each filesystem; when the page size is reached a + * bulk delete is submitted for execution in a separate thread. + * + * @param pathnames paths to delete. + * @return count of failures. + * @throws UncheckedIOException if an IOE was raised in the invoked methods. + * @throws RuntimeException if interrupted while waiting for deletions to complete. + */ + public int bulkDeleteFiles(Iterable<String> pathnames) { + + LOG.debug("Using bulk delete operation to delete files"); + + // Deletion context for each filesystem, using the root path as lookup. + Map<Path, DeleteContext> contextMap = Maps.newHashMap(); + + // List of ongoing deletion tasks. + List<Future<Outcome>> deletionTasks = Lists.newArrayList(); + + int totalFailedDeletions = 0; + + try { + for (String name : pathnames) { + Path target = new Path(name); + final FileSystem fs; + try { + fs = target.getFileSystem(conf); + } catch (Exception e) { + // any failure to find/load a filesystem + LOG.info("Failed to get filesystem for path: {}; unable to delete it", target, e); + totalFailedDeletions++; + continue; + } + // build root path of the filesystem, + Path fsRoot = fs.makeQualified(new Path("/")); + DeleteContext dc = contextMap.get(fsRoot); + if (dc == null) { + // fs root is not in the map, so create the bulk delete operation for + // that FS and store within a new delete context. + dc = new DeleteContext(fsRoot, fs.createBulkDelete(fsRoot)); + contextMap.put(fsRoot, dc); + } + + // make final for the closure use. + final DeleteContext deleteContext = dc; + + // add the deletion target. + deleteContext.add(target); + + if (deleteContext.pageSizeReached()) { + // the page size has been reached. + // get the live path list, which MUST be done outside the async + // submitted closure. This also resets the context list to prepare + // for more entries. + final Collection<Path> paths = deleteContext.snapshotDeletedFiles(); + // execute the bulk delete in a new thread. + deletionTasks.add(executorService.submit(() -> deleteContext.deleteBatch(paths))); Review Comment: The context could have a method `Callable getDeletionTask()`. This method could do the snapshoting and resetting. ########## core/src/main/java/org/apache/iceberg/hadoop/BulkDeleter.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Contains references to the hadoop bulk delete API; It will not be available on hadoop 3.3.x + * runtimes. + */ +final class BulkDeleter { + + private static final Logger LOG = LoggerFactory.getLogger(BulkDeleter.class); + + /** Resource looked for as an availability probe: {@value}. */ + private static final String BULK_DELETE_CLASS = "org/apache/hadoop/fs/BulkDelete.class"; + + /** Thread pool for deletions. */ + private final ExecutorService executorService; + + /** Configuration for filesystems to retrieve. */ + private final Configuration conf; + + /** + * Constructor. + * + * @param executorService pool for executing bulk delete in parallel + * @param conf hadoop configuration used to instantiate filesystems. + */ + BulkDeleter(ExecutorService executorService, Configuration conf) { + this.executorService = executorService; + this.conf = conf; + } + + /** + * Is the bulk delete API available? + * + * @return true if the bulk delete interface class is on the classpath. + */ + public static boolean apiAvailable() { + return BulkDeleter.class.getClassLoader().getResource(BULK_DELETE_CLASS) != null; + } + + /** + * Bulk delete files. + * + * <p>When implemented in the hadoop filesystem APIs, all filesystems support a bulk delete of a + * page size of at least one. On S3A a larger bulk delete operation is supported, with the page + * size set by {@code fs.s3a.bulk.delete.page.size}. + * + * <p>A page of paths to delete is built up for each filesystem; when the page size is reached a + * bulk delete is submitted for execution in a separate thread. + * + * @param pathnames paths to delete. + * @return count of failures. + * @throws UncheckedIOException if an IOE was raised in the invoked methods. + * @throws RuntimeException if interrupted while waiting for deletions to complete. + */ + public int bulkDeleteFiles(Iterable<String> pathnames) { + + LOG.debug("Using bulk delete operation to delete files"); + + // Deletion context for each filesystem, using the root path as lookup. + Map<Path, DeleteContext> contextMap = Maps.newHashMap(); + + // List of ongoing deletion tasks. + List<Future<Outcome>> deletionTasks = Lists.newArrayList(); + + int totalFailedDeletions = 0; + + try { + for (String name : pathnames) { + Path target = new Path(name); + final FileSystem fs; + try { + fs = target.getFileSystem(conf); + } catch (Exception e) { + // any failure to find/load a filesystem + LOG.info("Failed to get filesystem for path: {}; unable to delete it", target, e); + totalFailedDeletions++; + continue; + } + // build root path of the filesystem, + Path fsRoot = fs.makeQualified(new Path("/")); + DeleteContext dc = contextMap.get(fsRoot); + if (dc == null) { + // fs root is not in the map, so create the bulk delete operation for + // that FS and store within a new delete context. + dc = new DeleteContext(fsRoot, fs.createBulkDelete(fsRoot)); + contextMap.put(fsRoot, dc); + } + + // make final for the closure use. + final DeleteContext deleteContext = dc; + + // add the deletion target. + deleteContext.add(target); + + if (deleteContext.pageSizeReached()) { + // the page size has been reached. + // get the live path list, which MUST be done outside the async + // submitted closure. This also resets the context list to prepare + // for more entries. + final Collection<Path> paths = deleteContext.snapshotDeletedFiles(); + // execute the bulk delete in a new thread. + deletionTasks.add(executorService.submit(() -> deleteContext.deleteBatch(paths))); + } + } + + // End of the iteration. Submit deletion batches for all + // entries in the map which haven't yet reached their page size + contextMap.values().stream() + .filter(sd -> !sd.isEmpty()) + .map(sd -> executorService.submit(() -> sd.deleteBatch(sd.deletedFiles()))) + .forEach(deletionTasks::add); + + // Wait for all deletion tasks to complete and report any failures. + LOG.debug("Waiting for {} deletion tasks to complete", deletionTasks.size()); + + for (Future<Outcome> deletionTask : deletionTasks) { + try { + List<DeleteFailure> failedDeletions = deletionTask.get().failures(); Review Comment: We could add a configurable timeout to prevent deadlocks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
