szehon-ho commented on code in PR #11555: URL: https://github.com/apache/iceberg/pull/11555#discussion_r1903293186
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java: ########## @@ -0,0 +1,731 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteTablePathUtil; +import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter; +import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadata.MetadataLogEntry; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.actions.ImmutableRewriteTablePath; +import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.util.Pair; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePath> + implements RewriteTablePath { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathSparkAction.class); + private static final String RESULT_LOCATION = "file-list"; + + private String sourcePrefix; + private String targetPrefix; + private String startVersionName; + private String endVersionName; + private String stagingDir; + + private final Table table; + + RewriteTablePathSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + } + + @Override + protected RewriteTablePath self() { + return this; + } + + @Override + public RewriteTablePath rewriteLocationPrefix(String sPrefix, String tPrefix) { + Preconditions.checkArgument( + sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be empty.", sPrefix); + this.sourcePrefix = sPrefix; + this.targetPrefix = tPrefix; + return this; + } + + @Override + public RewriteTablePath startVersion(String sVersion) { + Preconditions.checkArgument( + sVersion != null && !sVersion.trim().isEmpty(), + "Start version('%s') cannot be empty.", + sVersion); + this.startVersionName = sVersion; + return this; + } + + @Override + public RewriteTablePath endVersion(String eVersion) { + Preconditions.checkArgument( + eVersion != null && !eVersion.trim().isEmpty(), + "End version('%s') cannot be empty.", + eVersion); + this.endVersionName = eVersion; + return this; + } + + @Override + public RewriteTablePath stagingLocation(String stagingLocation) { + Preconditions.checkArgument( + stagingLocation != null && !stagingLocation.isEmpty(), + "Staging location('%s') cannot be empty.", + stagingLocation); + this.stagingDir = stagingLocation; + return this; + } + + @Override + public Result execute() { + validateInputs(); + JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc()); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + String resultLocation = rebuildMetadata(); + return ImmutableRewriteTablePath.Result.builder() + .stagingLocation(stagingDir) + .fileListLocation(resultLocation) + .latestVersion(RewriteTablePathUtil.fileName(endVersionName)) + .build(); + } + + private void validateInputs() { + Preconditions.checkArgument( + sourcePrefix != null && !sourcePrefix.isEmpty(), + "Source prefix('%s') cannot be empty.", + sourcePrefix); + Preconditions.checkArgument( + targetPrefix != null && !targetPrefix.isEmpty(), + "Target prefix('%s') cannot be empty.", + targetPrefix); + Preconditions.checkArgument( + !sourcePrefix.equals(targetPrefix), + "Source prefix cannot be the same as target prefix (%s)", + sourcePrefix); + + validateAndSetEndVersion(); + validateAndSetStartVersion(); + + if (stagingDir == null) { + stagingDir = getMetadataLocation(table) + "copy-table-staging-" + UUID.randomUUID() + "/"; + } else if (!stagingDir.endsWith("/")) { + stagingDir = stagingDir + "/"; + } + } + + private void validateAndSetEndVersion() { + TableMetadata tableMetadata = ((HasTableOperations) table).operations().current(); + + if (endVersionName == null) { + LOG.info("No end version specified. Will stage all files to the latest table version."); + Preconditions.checkNotNull( + tableMetadata.metadataFileLocation(), "Metadata file location should not be null"); + this.endVersionName = tableMetadata.metadataFileLocation(); + } else { + this.endVersionName = validateVersion(tableMetadata, endVersionName); + } + } + + private void validateAndSetStartVersion() { + TableMetadata tableMetadata = ((HasTableOperations) table).operations().current(); + + if (startVersionName != null) { + this.startVersionName = validateVersion(tableMetadata, startVersionName); + } + } + + private String validateVersion(TableMetadata tableMetadata, String versionFileName) { + String versionFile = null; + if (versionInFilePath(tableMetadata.metadataFileLocation(), versionFileName)) { + versionFile = tableMetadata.metadataFileLocation(); + } + + for (MetadataLogEntry log : tableMetadata.previousFiles()) { + if (versionInFilePath(log.file(), versionFileName)) { + versionFile = log.file(); + } + } + + Preconditions.checkNotNull( + versionFile, "Version file %s does not exist in metadata log.", versionFile); + Preconditions.checkArgument( + fileExist(versionFile), "Version file %s does not exist.", versionFile); + return versionFile; + } + + private boolean versionInFilePath(String path, String version) { + return RewriteTablePathUtil.fileName(path).equals(version); + } + + private String jobDesc() { + if (startVersionName != null) { + return String.format( + "Replacing path prefixes '%s' with '%s' in the metadata files of table %s," + + "up to version '%s'.", + sourcePrefix, targetPrefix, table.name(), endVersionName); + } else { + return String.format( + "Replacing path prefixes '%s' with '%s' in the metadata files of table %s," + + "from version '%s' to '%s'.", + sourcePrefix, targetPrefix, table.name(), startVersionName, endVersionName); + } + } + + /** + * Rebuild metadata in a staging location, with paths rewritten. + * + * <ul> + * <li>Rebuild version files to staging + * <li>Rebuild manifest list files to staging + * <li>Rebuild manifest to staging + * <li>Get all files needed to move + * </ul> + */ + private String rebuildMetadata() { + TableMetadata startMetadata = + startVersionName != null + ? ((HasTableOperations) newStaticTable(startVersionName, table.io())) + .operations() + .current() + : null; + TableMetadata endMetadata = + ((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current(); + + Preconditions.checkArgument( + endMetadata.statisticsFiles() == null || endMetadata.statisticsFiles().isEmpty(), + "Statistic files are not supported yet."); + + // rebuild version files + RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata); + Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); + + Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata); + Set<Snapshot> validSnapshots = + Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata)); + + // rebuild manifest-list files + RewriteResult<ManifestFile> rewriteManifestListResult = + validSnapshots.stream() + .map(snapshot -> rewriteManifestList(snapshot, endMetadata, manifestsToRewrite)) + .reduce(new RewriteResult<>(), RewriteResult::append); + + // rebuild manifest files + RewriteContentFileResult rewriteManifestResult = + rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite()); + + // rebuild position delete files + Set<DeleteFile> deleteFiles = + rewriteManifestResult.toRewrite().stream() + .filter(e -> e instanceof DeleteFile) + .map(e -> (DeleteFile) e) + .collect(Collectors.toSet()); + rewritePositionDeletes(endMetadata, deleteFiles); + + Set<Pair<String, String>> copyPlan = Sets.newHashSet(); + copyPlan.addAll(rewriteVersionResult.copyPlan()); + copyPlan.addAll(rewriteManifestListResult.copyPlan()); + copyPlan.addAll(rewriteManifestResult.copyPlan()); + + return saveFileList(copyPlan); + } + + private String saveFileList(Set<Pair<String, String>> filesToMove) { + List<Tuple2<String, String>> fileList = + filesToMove.stream() + .map(p -> Tuple2.apply(p.first(), p.second())) + .collect(Collectors.toList()); + Dataset<Tuple2<String, String>> fileListDataset = + spark().createDataset(fileList, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + String fileListPath = stagingDir + RESULT_LOCATION; + fileListDataset + .repartition(1) + .write() + .mode(SaveMode.Overwrite) + .format("csv") + .save(fileListPath); + return fileListPath; + } + + private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot> allSnapshots) { + if (startMetadata == null) { + return allSnapshots; + } else { + Set<Long> startSnapshotIds = + startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return allSnapshots.stream() + .filter(s -> !startSnapshotIds.contains(s.snapshotId())) + .collect(Collectors.toSet()); + } + } + + private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) { + RewriteResult<Snapshot> result = new RewriteResult<>(); + result.toRewrite().addAll(endMetadata.snapshots()); + result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName)); + + List<MetadataLogEntry> versions = endMetadata.previousFiles(); + for (int i = versions.size() - 1; i >= 0; i--) { + String versionFilePath = versions.get(i).file(); + if (versionFilePath.equals(startVersionName)) { + break; + } + + Preconditions.checkArgument( + fileExist(versionFilePath), + String.format("Version file %s doesn't exist", versionFilePath)); + TableMetadata tableMetadata = + new StaticTableOperations(versionFilePath, table.io()).current(); + + result.toRewrite().addAll(tableMetadata.snapshots()); + result.copyPlan().add(rewriteVersionFile(tableMetadata, versionFilePath)); + } + + return result; + } + + private Pair<String, String> rewriteVersionFile(TableMetadata metadata, String versionFilePath) { + String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir); + TableMetadata newTableMetadata = + RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix); + TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath)); + return Pair.of(stagingPath, newPath(versionFilePath, sourcePrefix, targetPrefix)); + } + + /** + * Rewrite a manifest list representing a snapshot. + * + * @param snapshot snapshot represented by the manifest list + * @param tableMetadata metadata of table + * @param manifestsToRewrite filter of manifests to rewrite. + * @return a result including a copy plan for the manifests contained in the manifest list, as + * well as for the manifest list itself + */ + private RewriteResult<ManifestFile> rewriteManifestList( + Snapshot snapshot, TableMetadata tableMetadata, Set<String> manifestsToRewrite) { + RewriteResult<ManifestFile> result = new RewriteResult<>(); + + String path = snapshot.manifestListLocation(); + String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir); + RewriteResult<ManifestFile> rewriteResult = + RewriteTablePathUtil.rewriteManifestList( + snapshot, + table.io(), + tableMetadata, + manifestsToRewrite, + sourcePrefix, + targetPrefix, + stagingDir, + outputPath); + + result.append(rewriteResult); + // add the manifest list copy plan itself to the result + result.copyPlan().add(Pair.of(outputPath, newPath(path, sourcePrefix, targetPrefix))); + return result; + } + + private Set<String> manifestsToRewrite( + Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) { + try { + Table endStaticTable = newStaticTable(endVersionName, table.io()); + Dataset<Row> lastVersionFiles = manifestDS(endStaticTable).select("path"); + if (startMetadata == null) { + return Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList()); + } else { + Set<Long> deltaSnapshotIds = + deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return Sets.newHashSet( + lastVersionFiles + .distinct() + .filter( + functions + .column(ManifestFile.SNAPSHOT_ID.name()) + .isInCollection(deltaSnapshotIds)) + .as(Encoders.STRING()) + .collectAsList()); + } + } catch (Exception e) { + throw new UnsupportedOperationException( + "Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. " + + "Please choose an earlier version without invalid snapshots.", + e); + } + } + + public static class RewriteContentFileResult extends RewriteResult<ContentFile<?>> { Review Comment: Note: this extra override class is added because Spark encoder didnt like the parent class (RewriteResult) that has type parameter T, hence the need to make a concrete parameterized class. It also then needed some extra methods to be able to cleanly aggregate RewriteResult of different types , now with change of https://github.com/apache/iceberg/pull/11555#discussion_r1902167881 that makes the rewrite data file also return this class. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java: ########## @@ -0,0 +1,731 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteTablePathUtil; +import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter; +import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadata.MetadataLogEntry; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.actions.ImmutableRewriteTablePath; +import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.JobGroupInfo; +import org.apache.iceberg.util.Pair; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +public class RewriteTablePathSparkAction extends BaseSparkAction<RewriteTablePath> + implements RewriteTablePath { + + private static final Logger LOG = LoggerFactory.getLogger(RewriteTablePathSparkAction.class); + private static final String RESULT_LOCATION = "file-list"; + + private String sourcePrefix; + private String targetPrefix; + private String startVersionName; + private String endVersionName; + private String stagingDir; + + private final Table table; + + RewriteTablePathSparkAction(SparkSession spark, Table table) { + super(spark); + this.table = table; + } + + @Override + protected RewriteTablePath self() { + return this; + } + + @Override + public RewriteTablePath rewriteLocationPrefix(String sPrefix, String tPrefix) { + Preconditions.checkArgument( + sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be empty.", sPrefix); + this.sourcePrefix = sPrefix; + this.targetPrefix = tPrefix; + return this; + } + + @Override + public RewriteTablePath startVersion(String sVersion) { + Preconditions.checkArgument( + sVersion != null && !sVersion.trim().isEmpty(), + "Start version('%s') cannot be empty.", + sVersion); + this.startVersionName = sVersion; + return this; + } + + @Override + public RewriteTablePath endVersion(String eVersion) { + Preconditions.checkArgument( + eVersion != null && !eVersion.trim().isEmpty(), + "End version('%s') cannot be empty.", + eVersion); + this.endVersionName = eVersion; + return this; + } + + @Override + public RewriteTablePath stagingLocation(String stagingLocation) { + Preconditions.checkArgument( + stagingLocation != null && !stagingLocation.isEmpty(), + "Staging location('%s') cannot be empty.", + stagingLocation); + this.stagingDir = stagingLocation; + return this; + } + + @Override + public Result execute() { + validateInputs(); + JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc()); + return withJobGroupInfo(info, this::doExecute); + } + + private Result doExecute() { + String resultLocation = rebuildMetadata(); + return ImmutableRewriteTablePath.Result.builder() + .stagingLocation(stagingDir) + .fileListLocation(resultLocation) + .latestVersion(RewriteTablePathUtil.fileName(endVersionName)) + .build(); + } + + private void validateInputs() { + Preconditions.checkArgument( + sourcePrefix != null && !sourcePrefix.isEmpty(), + "Source prefix('%s') cannot be empty.", + sourcePrefix); + Preconditions.checkArgument( + targetPrefix != null && !targetPrefix.isEmpty(), + "Target prefix('%s') cannot be empty.", + targetPrefix); + Preconditions.checkArgument( + !sourcePrefix.equals(targetPrefix), + "Source prefix cannot be the same as target prefix (%s)", + sourcePrefix); + + validateAndSetEndVersion(); + validateAndSetStartVersion(); + + if (stagingDir == null) { + stagingDir = getMetadataLocation(table) + "copy-table-staging-" + UUID.randomUUID() + "/"; + } else if (!stagingDir.endsWith("/")) { + stagingDir = stagingDir + "/"; + } + } + + private void validateAndSetEndVersion() { + TableMetadata tableMetadata = ((HasTableOperations) table).operations().current(); + + if (endVersionName == null) { + LOG.info("No end version specified. Will stage all files to the latest table version."); + Preconditions.checkNotNull( + tableMetadata.metadataFileLocation(), "Metadata file location should not be null"); + this.endVersionName = tableMetadata.metadataFileLocation(); + } else { + this.endVersionName = validateVersion(tableMetadata, endVersionName); + } + } + + private void validateAndSetStartVersion() { + TableMetadata tableMetadata = ((HasTableOperations) table).operations().current(); + + if (startVersionName != null) { + this.startVersionName = validateVersion(tableMetadata, startVersionName); + } + } + + private String validateVersion(TableMetadata tableMetadata, String versionFileName) { + String versionFile = null; + if (versionInFilePath(tableMetadata.metadataFileLocation(), versionFileName)) { + versionFile = tableMetadata.metadataFileLocation(); + } + + for (MetadataLogEntry log : tableMetadata.previousFiles()) { + if (versionInFilePath(log.file(), versionFileName)) { + versionFile = log.file(); + } + } + + Preconditions.checkNotNull( + versionFile, "Version file %s does not exist in metadata log.", versionFile); + Preconditions.checkArgument( + fileExist(versionFile), "Version file %s does not exist.", versionFile); + return versionFile; + } + + private boolean versionInFilePath(String path, String version) { + return RewriteTablePathUtil.fileName(path).equals(version); + } + + private String jobDesc() { + if (startVersionName != null) { + return String.format( + "Replacing path prefixes '%s' with '%s' in the metadata files of table %s," + + "up to version '%s'.", + sourcePrefix, targetPrefix, table.name(), endVersionName); + } else { + return String.format( + "Replacing path prefixes '%s' with '%s' in the metadata files of table %s," + + "from version '%s' to '%s'.", + sourcePrefix, targetPrefix, table.name(), startVersionName, endVersionName); + } + } + + /** + * Rebuild metadata in a staging location, with paths rewritten. + * + * <ul> + * <li>Rebuild version files to staging + * <li>Rebuild manifest list files to staging + * <li>Rebuild manifest to staging + * <li>Get all files needed to move + * </ul> + */ + private String rebuildMetadata() { + TableMetadata startMetadata = + startVersionName != null + ? ((HasTableOperations) newStaticTable(startVersionName, table.io())) + .operations() + .current() + : null; + TableMetadata endMetadata = + ((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current(); + + Preconditions.checkArgument( + endMetadata.statisticsFiles() == null || endMetadata.statisticsFiles().isEmpty(), + "Statistic files are not supported yet."); + + // rebuild version files + RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata); + Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, rewriteVersionResult.toRewrite()); + + Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots, startMetadata); + Set<Snapshot> validSnapshots = + Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata)); + + // rebuild manifest-list files + RewriteResult<ManifestFile> rewriteManifestListResult = + validSnapshots.stream() + .map(snapshot -> rewriteManifestList(snapshot, endMetadata, manifestsToRewrite)) + .reduce(new RewriteResult<>(), RewriteResult::append); + + // rebuild manifest files + RewriteContentFileResult rewriteManifestResult = + rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite()); + + // rebuild position delete files + Set<DeleteFile> deleteFiles = + rewriteManifestResult.toRewrite().stream() + .filter(e -> e instanceof DeleteFile) + .map(e -> (DeleteFile) e) + .collect(Collectors.toSet()); + rewritePositionDeletes(endMetadata, deleteFiles); + + Set<Pair<String, String>> copyPlan = Sets.newHashSet(); + copyPlan.addAll(rewriteVersionResult.copyPlan()); + copyPlan.addAll(rewriteManifestListResult.copyPlan()); + copyPlan.addAll(rewriteManifestResult.copyPlan()); + + return saveFileList(copyPlan); + } + + private String saveFileList(Set<Pair<String, String>> filesToMove) { + List<Tuple2<String, String>> fileList = + filesToMove.stream() + .map(p -> Tuple2.apply(p.first(), p.second())) + .collect(Collectors.toList()); + Dataset<Tuple2<String, String>> fileListDataset = + spark().createDataset(fileList, Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + String fileListPath = stagingDir + RESULT_LOCATION; + fileListDataset + .repartition(1) + .write() + .mode(SaveMode.Overwrite) + .format("csv") + .save(fileListPath); + return fileListPath; + } + + private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot> allSnapshots) { + if (startMetadata == null) { + return allSnapshots; + } else { + Set<Long> startSnapshotIds = + startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return allSnapshots.stream() + .filter(s -> !startSnapshotIds.contains(s.snapshotId())) + .collect(Collectors.toSet()); + } + } + + private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) { + RewriteResult<Snapshot> result = new RewriteResult<>(); + result.toRewrite().addAll(endMetadata.snapshots()); + result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName)); + + List<MetadataLogEntry> versions = endMetadata.previousFiles(); + for (int i = versions.size() - 1; i >= 0; i--) { + String versionFilePath = versions.get(i).file(); + if (versionFilePath.equals(startVersionName)) { + break; + } + + Preconditions.checkArgument( + fileExist(versionFilePath), + String.format("Version file %s doesn't exist", versionFilePath)); + TableMetadata tableMetadata = + new StaticTableOperations(versionFilePath, table.io()).current(); + + result.toRewrite().addAll(tableMetadata.snapshots()); + result.copyPlan().add(rewriteVersionFile(tableMetadata, versionFilePath)); + } + + return result; + } + + private Pair<String, String> rewriteVersionFile(TableMetadata metadata, String versionFilePath) { + String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, stagingDir); + TableMetadata newTableMetadata = + RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix); + TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath)); + return Pair.of(stagingPath, newPath(versionFilePath, sourcePrefix, targetPrefix)); + } + + /** + * Rewrite a manifest list representing a snapshot. + * + * @param snapshot snapshot represented by the manifest list + * @param tableMetadata metadata of table + * @param manifestsToRewrite filter of manifests to rewrite. + * @return a result including a copy plan for the manifests contained in the manifest list, as + * well as for the manifest list itself + */ + private RewriteResult<ManifestFile> rewriteManifestList( + Snapshot snapshot, TableMetadata tableMetadata, Set<String> manifestsToRewrite) { + RewriteResult<ManifestFile> result = new RewriteResult<>(); + + String path = snapshot.manifestListLocation(); + String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir); + RewriteResult<ManifestFile> rewriteResult = + RewriteTablePathUtil.rewriteManifestList( + snapshot, + table.io(), + tableMetadata, + manifestsToRewrite, + sourcePrefix, + targetPrefix, + stagingDir, + outputPath); + + result.append(rewriteResult); + // add the manifest list copy plan itself to the result + result.copyPlan().add(Pair.of(outputPath, newPath(path, sourcePrefix, targetPrefix))); + return result; + } + + private Set<String> manifestsToRewrite( + Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) { + try { + Table endStaticTable = newStaticTable(endVersionName, table.io()); + Dataset<Row> lastVersionFiles = manifestDS(endStaticTable).select("path"); + if (startMetadata == null) { + return Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList()); + } else { + Set<Long> deltaSnapshotIds = + deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + return Sets.newHashSet( + lastVersionFiles + .distinct() + .filter( + functions + .column(ManifestFile.SNAPSHOT_ID.name()) + .isInCollection(deltaSnapshotIds)) + .as(Encoders.STRING()) + .collectAsList()); + } + } catch (Exception e) { + throw new UnsupportedOperationException( + "Unable to build the manifest files dataframe. The end version in use may contain invalid snapshots. " + + "Please choose an earlier version without invalid snapshots.", + e); + } + } + + public static class RewriteContentFileResult extends RewriteResult<ContentFile<?>> { Review Comment: Note: this extra override class is added because Spark encoder didnt like the parent class (RewriteResult) that has type parameter T, hence the need to make a concrete parameterized class. It also then needed some extra methods to be able to cleanly aggregate RewriteResult of different types , now with change of https://github.com/apache/iceberg/pull/11555#discussion_r1902167881 that makes the rewrite data file also return RewriteResult. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org