aokolnychyi commented on code in PR #7029: URL: https://github.com/apache/iceberg/pull/7029#discussion_r1154058565
########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java: ########## @@ -18,79 +18,23 @@ */ package org.apache.iceberg.spark; -import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import org.apache.iceberg.DataFile; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class FileRewriteCoordinator { +public class FileRewriteCoordinator extends BaseFileRewriteCoordinator<DataFile> { - private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class); private static final FileRewriteCoordinator INSTANCE = new FileRewriteCoordinator(); - private final Map<Pair<String, String>, Set<DataFile>> resultMap = Maps.newConcurrentMap(); - private FileRewriteCoordinator() {} public static FileRewriteCoordinator get() { return INSTANCE; } - /** - * Called to persist the output of a rewrite action for a specific group. Since the write is done - * via a Spark Datasource, we have to propagate the result through this side-effect call. - * - * @param table table where the rewrite is occurring - * @param fileSetID the id used to identify the source set of files being rewritten - * @param newDataFiles the new files which have been written - */ - public void stageRewrite(Table table, String fileSetID, Set<DataFile> newDataFiles) { - LOG.debug( - "Staging the output for {} - fileset {} with {} files", - table.name(), - fileSetID, - newDataFiles.size()); - Pair<String, String> id = toID(table, fileSetID); - resultMap.put(id, newDataFiles); - } - - public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) { - Pair<String, String> id = toID(table, fileSetID); - Set<DataFile> result = resultMap.get(id); - ValidationException.check( - result != null, "No results for rewrite of file set %s in table %s", fileSetID, table); - - return result; - } - - public void clearRewrite(Table table, String fileSetID) { - LOG.debug("Removing entry from RewriteCoordinator for {} - id {}", table.name(), fileSetID); - Pair<String, String> id = toID(table, fileSetID); - resultMap.remove(id); - } - - public Set<String> fetchSetIDs(Table table) { Review Comment: I believe we renamed it to be `fetchSetIds` instead of `fetchSetIDs`, so have to keep the old method for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org