dramaticlly commented on code in PR #11555:
URL: https://github.com/apache/iceberg/pull/11555#discussion_r1902174767


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter;
+import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.actions.ImmutableRewriteTablePath;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePath>
+    implements RewriteTablePath {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
+  private static final String RESULT_LOCATION = "file-list";
+
+  private String sourcePrefix;
+  private String targetPrefix;
+  private String startVersionName;
+  private String endVersionName;
+  private String stagingDir;
+
+  private final Table table;
+
+  RewriteTablePathSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  protected RewriteTablePath self() {
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath rewriteLocationPrefix(String sPrefix, String 
tPrefix) {
+    Preconditions.checkArgument(
+        sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be 
empty.", sPrefix);
+    this.sourcePrefix = sPrefix;
+    this.targetPrefix = tPrefix;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath startVersion(String sVersion) {
+    Preconditions.checkArgument(
+        sVersion != null && !sVersion.trim().isEmpty(),
+        "Start version('%s') cannot be empty.",
+        sVersion);
+    this.startVersionName = sVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath endVersion(String eVersion) {
+    Preconditions.checkArgument(
+        eVersion != null && !eVersion.trim().isEmpty(),
+        "End version('%s') cannot be empty.",
+        eVersion);
+    this.endVersionName = eVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath stagingLocation(String stagingLocation) {
+    Preconditions.checkArgument(
+        stagingLocation != null && !stagingLocation.isEmpty(),
+        "Staging location('%s') cannot be empty.",
+        stagingLocation);
+    this.stagingDir = stagingLocation;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    validateInputs();
+    JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    String resultLocation = rebuildMetadata();
+    return ImmutableRewriteTablePath.Result.builder()
+        .stagingLocation(stagingDir)
+        .fileListLocation(resultLocation)
+        .latestVersion(RewriteTablePathUtil.fileName(endVersionName))
+        .build();
+  }
+
+  private void validateInputs() {
+    Preconditions.checkArgument(
+        sourcePrefix != null && !sourcePrefix.isEmpty(),
+        "Source prefix('%s') cannot be empty.",
+        sourcePrefix);
+    Preconditions.checkArgument(
+        targetPrefix != null && !targetPrefix.isEmpty(),
+        "Target prefix('%s') cannot be empty.",
+        targetPrefix);
+    Preconditions.checkArgument(
+        !sourcePrefix.equals(targetPrefix),
+        "Source prefix cannot be the same as target prefix (%s)",
+        sourcePrefix);
+
+    validateAndSetEndVersion();
+    validateAndSetStartVersion();
+
+    if (stagingDir == null) {
+      stagingDir = getMetadataLocation(table) + "copy-table-staging-" + 
UUID.randomUUID() + "/";
+    } else if (!stagingDir.endsWith("/")) {
+      stagingDir = stagingDir + "/";
+    }
+  }
+
+  private void validateAndSetEndVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (endVersionName == null) {
+      LOG.info("No end version specified. Will stage all files to the latest 
table version.");
+      Preconditions.checkNotNull(
+          tableMetadata.metadataFileLocation(), "Metadata file location should 
not be null");
+      this.endVersionName = tableMetadata.metadataFileLocation();
+    } else {
+      this.endVersionName = validateVersion(tableMetadata, endVersionName);
+    }
+  }
+
+  private void validateAndSetStartVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (startVersionName != null) {
+      this.startVersionName = validateVersion(tableMetadata, startVersionName);
+    }
+  }
+
+  private String validateVersion(TableMetadata tableMetadata, String 
versionFileName) {
+    String versionFile = versionFile(tableMetadata, versionFileName);
+
+    Preconditions.checkNotNull(
+        versionFile, "Version file %s does not exist in metadata log.", 
versionFile);
+    Preconditions.checkArgument(
+        fileExist(versionFile), "Version file %s does not exist.", 
versionFile);
+    return versionFile;
+  }
+
+  private String versionFile(TableMetadata metadata, String versionFileName) {
+    if (versionInFilePath(metadata.metadataFileLocation(), versionFileName)) {
+      return metadata.metadataFileLocation();
+    }
+
+    for (MetadataLogEntry log : metadata.previousFiles()) {
+      if (versionInFilePath(log.file(), versionFileName)) {
+        return log.file();
+      }
+    }
+    return null;

Review Comment:
   since this is only used in LINE 210 above and immediate followed by a null 
check, maybe we can just throw exception here instead?



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Rewrite table path action. */
+public class RewriteTablePathUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteTablePathUtil.class);
+
+  private RewriteTablePathUtil() {}
+
+  /**
+   * Rewrite result.
+   *
+   * @param <T> type of file to rewrite
+   */
+  public static class RewriteResult<T> implements Serializable {
+    private final Set<T> toRewrite = Sets.newHashSet();
+    private final Set<Pair<String, String>> copyPlan = Sets.newHashSet();
+
+    public RewriteResult() {}
+
+    public RewriteResult<T> append(RewriteResult<T> r1) {
+      toRewrite.addAll(r1.toRewrite);
+      copyPlan.addAll(r1.copyPlan);
+      return this;
+    }
+
+    /** Returns next list of files to rewrite (discovered by rewriting this 
file) */
+    public Set<T> toRewrite() {
+      return toRewrite;
+    }
+
+    /**
+     * Returns a copy plan of files whose metadata were rewritten, for each 
file a source and target
+     * location
+     */
+    public Set<Pair<String, String>> copyPlan() {
+      return copyPlan;
+    }
+  }
+
+  /**
+   * Create a new table metadata object, replacing path references
+   *
+   * @param metadata source table metadata
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @return copy of table metadata with paths replaced
+   */
+  public static TableMetadata replacePaths(
+      TableMetadata metadata, String sourcePrefix, String targetPrefix) {
+    String newLocation = metadata.location().replaceFirst(sourcePrefix, 
targetPrefix);
+    List<Snapshot> newSnapshots = updatePathInSnapshots(metadata, 
sourcePrefix, targetPrefix);
+    List<TableMetadata.MetadataLogEntry> metadataLogEntries =
+        updatePathInMetadataLogs(metadata, sourcePrefix, targetPrefix);
+    long snapshotId =
+        metadata.currentSnapshot() == null ? -1 : 
metadata.currentSnapshot().snapshotId();
+    Map<String, String> properties =
+        updateProperties(metadata.properties(), sourcePrefix, targetPrefix);
+
+    return new TableMetadata(
+        null,
+        metadata.formatVersion(),
+        metadata.uuid(),
+        newLocation,
+        metadata.lastSequenceNumber(),
+        metadata.lastUpdatedMillis(),
+        metadata.lastColumnId(),
+        metadata.currentSchemaId(),
+        metadata.schemas(),
+        metadata.defaultSpecId(),
+        metadata.specs(),
+        metadata.lastAssignedPartitionId(),
+        metadata.defaultSortOrderId(),
+        metadata.sortOrders(),
+        properties,
+        snapshotId,
+        newSnapshots,
+        null,
+        metadata.snapshotLog(),
+        metadataLogEntries,
+        metadata.refs(),
+        metadata.statisticsFiles(),
+        metadata.partitionStatisticsFiles(),
+        metadata.changes());
+  }
+
+  private static Map<String, String> updateProperties(
+      Map<String, String> tableProperties, String sourcePrefix, String 
targetPrefix) {
+    Map<String, String> properties = Maps.newHashMap(tableProperties);
+    updatePathInProperty(properties, sourcePrefix, targetPrefix, 
TableProperties.OBJECT_STORE_PATH);
+    updatePathInProperty(
+        properties, sourcePrefix, targetPrefix, 
TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
+    updatePathInProperty(
+        properties, sourcePrefix, targetPrefix, 
TableProperties.WRITE_DATA_LOCATION);
+    updatePathInProperty(
+        properties, sourcePrefix, targetPrefix, 
TableProperties.WRITE_METADATA_LOCATION);
+
+    return properties;
+  }
+
+  private static void updatePathInProperty(
+      Map<String, String> properties,
+      String sourcePrefix,
+      String targetPrefix,
+      String propertyName) {
+    if (properties.containsKey(propertyName)) {
+      properties.put(
+          propertyName, newPath(properties.get(propertyName), sourcePrefix, 
targetPrefix));
+    }
+  }
+
+  private static List<TableMetadata.MetadataLogEntry> updatePathInMetadataLogs(
+      TableMetadata metadata, String sourcePrefix, String targetPrefix) {
+    List<TableMetadata.MetadataLogEntry> metadataLogEntries =
+        Lists.newArrayListWithCapacity(metadata.previousFiles().size());
+    for (TableMetadata.MetadataLogEntry metadataLog : 
metadata.previousFiles()) {
+      TableMetadata.MetadataLogEntry newMetadataLog =
+          new TableMetadata.MetadataLogEntry(
+              metadataLog.timestampMillis(),
+              newPath(metadataLog.file(), sourcePrefix, targetPrefix));
+      metadataLogEntries.add(newMetadataLog);
+    }
+    return metadataLogEntries;
+  }
+
+  private static List<Snapshot> updatePathInSnapshots(
+      TableMetadata metadata, String sourcePrefix, String targetPrefix) {
+    List<Snapshot> newSnapshots = 
Lists.newArrayListWithCapacity(metadata.snapshots().size());
+    for (Snapshot snapshot : metadata.snapshots()) {
+      String newManifestListLocation =
+          newPath(snapshot.manifestListLocation(), sourcePrefix, targetPrefix);
+      Snapshot newSnapshot =
+          new BaseSnapshot(
+              snapshot.sequenceNumber(),
+              snapshot.snapshotId(),
+              snapshot.parentId(),
+              snapshot.timestampMillis(),
+              snapshot.operation(),
+              snapshot.summary(),
+              snapshot.schemaId(),
+              newManifestListLocation);
+      newSnapshots.add(newSnapshot);
+    }
+    return newSnapshots;
+  }
+
+  /**
+   * Rewrite a manifest list representing a snapshot, replacing path 
references.
+   *
+   * @param snapshot snapshot represented by the manifest list
+   * @param io file io
+   * @param tableMetadata metadata of table
+   * @param manifestsToRewrite a list of manifest files to filter for rewrite
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @param stagingDir staging directory
+   * @param outputPath location to write the manifest list
+   * @return a copy plan for manifest files whose metadata were contained in 
the rewritten manifest
+   *     list
+   */
+  public static RewriteResult<ManifestFile> rewriteManifestList(
+      Snapshot snapshot,
+      FileIO io,
+      TableMetadata tableMetadata,
+      Set<String> manifestsToRewrite,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingDir,
+      String outputPath) {
+    RewriteResult<ManifestFile> result = new RewriteResult<>();
+    OutputFile outputFile = io.newOutputFile(outputPath);
+
+    List<ManifestFile> manifestFiles = manifestFilesInSnapshot(io, snapshot);
+    List<ManifestFile> manifestFilesToRewrite =
+        manifestFiles.stream()
+            .filter(mf -> manifestsToRewrite.contains(mf.path()))
+            .collect(Collectors.toList());
+    manifestFilesToRewrite.forEach(
+        mf ->
+            Preconditions.checkArgument(
+                mf.path().startsWith(sourcePrefix),
+                "Encountered manifest file %s not under the source prefix %s",
+                mf.path(),
+                sourcePrefix));
+
+    try (FileAppender<ManifestFile> writer =
+        ManifestLists.write(
+            tableMetadata.formatVersion(),
+            outputFile,
+            snapshot.snapshotId(),
+            snapshot.parentId(),
+            snapshot.sequenceNumber())) {
+
+      for (ManifestFile file : manifestFilesToRewrite) {
+        ManifestFile newFile = file.copy();
+        ((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, 
targetPrefix));
+        writer.add(newFile);
+
+        result.toRewrite().add(file);
+        result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), 
newFile.path()));
+      }
+      return result;
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          "Failed to rewrite the manifest list file " + 
snapshot.manifestListLocation(), e);
+    }
+  }
+
+  private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, 
Snapshot snapshot) {
+    String path = snapshot.manifestListLocation();
+    List<ManifestFile> manifestFiles = Lists.newLinkedList();
+    try {
+      manifestFiles = ManifestLists.read(io.newInputFile(path));
+    } catch (RuntimeIOException e) {
+      LOG.warn("Failed to read manifest list {}", path, e);
+    }
+    return manifestFiles;
+  }
+
+  /**
+   * Rewrite a data manifest, replacing path references.
+   *
+   * @param manifestFile source manifest file to rewrite
+   * @param outputFile output file to rewrite manifest file to
+   * @param io file io
+   * @param format format of the manifest file
+   * @param specsById map of partition specs by id
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @return a copy plan of content files in the manifest that was rewritten
+   */
+  public static List<Pair<String, String>> rewriteManifest(
+      ManifestFile manifestFile,
+      OutputFile outputFile,
+      FileIO io,
+      int format,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
+    try (ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, 
targetPrefix, writer))
+          .collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * Rewrite a delete manifest, replacing path references.
+   *
+   * @param manifestFile source delete manifest to rewrite
+   * @param outputFile output file to rewrite manifest file to
+   * @param io file io
+   * @param format format of the manifest file
+   * @param specsById map of partition specs by id
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @param stagingLocation staging location for rewritten files (referred 
delete file will be
+   *     rewritten here)
+   * @return a copy plan of content files in the manifest that was rewritten
+   */
+  public static RewriteResult<DeleteFile> rewriteDeleteManifest(
+      ManifestFile manifestFile,
+      OutputFile outputFile,
+      FileIO io,
+      int format,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation)
+      throws IOException {
+    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
+    try (ManifestWriter<DeleteFile> writer =
+            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DeleteFile> reader =
+            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(
+              entry ->
+                  writeDeleteFileEntry(
+                      entry, spec, sourcePrefix, targetPrefix, 
stagingLocation, writer))
+          .reduce(new RewriteResult<>(), RewriteResult::append);
+    }
+  }
+
+  private static Pair<String, String> writeDataFileEntry(
+      ManifestEntry<DataFile> entry,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      ManifestWriter<DataFile> writer) {
+    DataFile dataFile = entry.file();
+    String sourceDataFilePath = dataFile.location();
+    Preconditions.checkArgument(
+        sourceDataFilePath.startsWith(sourcePrefix),
+        "Encountered data file %s not under the source prefix %s",
+        sourceDataFilePath,
+        sourcePrefix);
+    String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, 
targetPrefix);
+    DataFile newDataFile =
+        
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
+    appendEntryWithFile(entry, writer, newDataFile);
+    return Pair.of(sourceDataFilePath, newDataFile.location());
+  }
+
+  private static RewriteResult<DeleteFile> writeDeleteFileEntry(
+      ManifestEntry<DeleteFile> entry,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation,
+      ManifestWriter<DeleteFile> writer) {
+
+    DeleteFile file = entry.file();
+    RewriteResult<DeleteFile> result = new RewriteResult<>();
+
+    switch (file.content()) {
+      case POSITION_DELETES:
+        String targetDeleteFilePath = newPath(file.location(), sourcePrefix, 
targetPrefix);
+        Metrics metricsWithTargetPath =
+            ContentFileUtil.replacePathBounds(file, sourcePrefix, 
targetPrefix);
+        DeleteFile movedFile =
+            FileMetadata.deleteFileBuilder(spec)
+                .copy(file)
+                .withPath(targetDeleteFilePath)
+                .withMetrics(metricsWithTargetPath)
+                .build();
+        appendEntryWithFile(entry, writer, movedFile);
+        result
+            .copyPlan()
+            .add(Pair.of(stagingPath(file.location(), stagingLocation), 
movedFile.location()));
+        result.toRewrite().add(file);
+        return result;
+      case EQUALITY_DELETES:
+        DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, 
sourcePrefix, targetPrefix);
+        appendEntryWithFile(entry, writer, eqDeleteFile);
+        // we do not need to recursively rewrite the equality delete, just 
move it
+        result.copyPlan().add(Pair.of(file.location(), 
eqDeleteFile.location()));
+        return result;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported delete file type: 
" + file.content());
+    }
+  }
+
+  private static <F extends ContentFile<F>> void appendEntryWithFile(
+      ManifestEntry<F> entry, ManifestWriter<F> writer, F file) {
+
+    switch (entry.status()) {
+      case ADDED:
+        writer.add(file);
+        break;
+      case EXISTING:
+        writer.existing(
+            file, entry.snapshotId(), entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+      case DELETED:
+        writer.delete(file, entry.dataSequenceNumber(), 
entry.fileSequenceNumber());
+        break;
+    }
+  }
+
+  private static DeleteFile newEqualityDeleteEntry(
+      DeleteFile file, PartitionSpec spec, String sourcePrefix, String 
targetPrefix) {
+    String path = file.location();
+
+    if (!path.startsWith(sourcePrefix)) {
+      throw new UnsupportedOperationException(
+          "Expected delete file to be under the source prefix: "
+              + sourcePrefix
+              + " but was "
+              + path);
+    }
+    int[] equalityFieldIds = 
file.equalityFieldIds().stream().mapToInt(Integer::intValue).toArray();
+    String newPath = newPath(path, sourcePrefix, targetPrefix);
+    return FileMetadata.deleteFileBuilder(spec)
+        .ofEqualityDeletes(equalityFieldIds)
+        .copy(file)
+        .withPath(newPath)
+        .withSplitOffsets(file.splitOffsets())
+        .build();
+  }
+
+  /** Class providing engine-specific methods to read and write position 
delete files. */
+  public interface PositionDeleteReaderWriter extends Serializable {
+    CloseableIterable<Record> reader(InputFile inputFile, FileFormat format, 
PartitionSpec spec);
+
+    PositionDeleteWriter<Record> writer(
+        OutputFile outputFile,
+        FileFormat format,
+        PartitionSpec spec,
+        StructLike partition,
+        Schema rowSchema)
+        throws IOException;
+  }
+
+  /**
+   * Rewrite a position delete file, replacing path references.
+   *
+   * @param deleteFile source delete file to be rewritten
+   * @param outputFile output file to rewrite delete file to
+   * @param io file io
+   * @param spec spec of delete file
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix to replace it
+   * @param posDeleteReaderWriter class to read and write position delete files
+   */
+  public static void rewritePositionDeleteFile(
+      DeleteFile deleteFile,
+      OutputFile outputFile,
+      FileIO io,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      PositionDeleteReaderWriter posDeleteReaderWriter)
+      throws IOException {
+    String path = deleteFile.location();
+    if (!path.startsWith(sourcePrefix)) {
+      throw new UnsupportedOperationException(
+          "Expected delete file to be under the source prefix: "
+              + sourcePrefix
+              + " but was "
+              + path);
+    }

Review Comment:
   nit, how about `String.format("Expected delete file %s to start with prefix: 
%s", path, sourcePrefix)`



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Rewrite table path action. */
+public class RewriteTablePathUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteTablePathUtil.class);
+
+  private RewriteTablePathUtil() {}
+
+  /**
+   * Rewrite result.
+   *
+   * @param <T> type of file to rewrite
+   */
+  public static class RewriteResult<T> implements Serializable {
+    private final Set<T> toRewrite = Sets.newHashSet();
+    private final Set<Pair<String, String>> copyPlan = Sets.newHashSet();
+
+    public RewriteResult() {}
+
+    public RewriteResult<T> append(RewriteResult<T> r1) {
+      toRewrite.addAll(r1.toRewrite);
+      copyPlan.addAll(r1.copyPlan);
+      return this;
+    }
+
+    /** Returns next list of files to rewrite (discovered by rewriting this 
file) */
+    public Set<T> toRewrite() {
+      return toRewrite;
+    }
+
+    /**
+     * Returns a copy plan of files whose metadata were rewritten, for each 
file a source and target
+     * location
+     */
+    public Set<Pair<String, String>> copyPlan() {
+      return copyPlan;
+    }
+  }
+
+  /**
+   * Create a new table metadata object, replacing path references
+   *
+   * @param metadata source table metadata
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @return copy of table metadata with paths replaced
+   */
+  public static TableMetadata replacePaths(
+      TableMetadata metadata, String sourcePrefix, String targetPrefix) {
+    String newLocation = metadata.location().replaceFirst(sourcePrefix, 
targetPrefix);
+    List<Snapshot> newSnapshots = updatePathInSnapshots(metadata, 
sourcePrefix, targetPrefix);
+    List<TableMetadata.MetadataLogEntry> metadataLogEntries =
+        updatePathInMetadataLogs(metadata, sourcePrefix, targetPrefix);
+    long snapshotId =
+        metadata.currentSnapshot() == null ? -1 : 
metadata.currentSnapshot().snapshotId();
+    Map<String, String> properties =
+        updateProperties(metadata.properties(), sourcePrefix, targetPrefix);
+
+    return new TableMetadata(
+        null,
+        metadata.formatVersion(),
+        metadata.uuid(),
+        newLocation,
+        metadata.lastSequenceNumber(),
+        metadata.lastUpdatedMillis(),
+        metadata.lastColumnId(),
+        metadata.currentSchemaId(),
+        metadata.schemas(),
+        metadata.defaultSpecId(),
+        metadata.specs(),
+        metadata.lastAssignedPartitionId(),
+        metadata.defaultSortOrderId(),
+        metadata.sortOrders(),
+        properties,
+        snapshotId,
+        newSnapshots,
+        null,
+        metadata.snapshotLog(),
+        metadataLogEntries,
+        metadata.refs(),
+        metadata.statisticsFiles(),
+        metadata.partitionStatisticsFiles(),
+        metadata.changes());
+  }
+
+  private static Map<String, String> updateProperties(
+      Map<String, String> tableProperties, String sourcePrefix, String 
targetPrefix) {
+    Map<String, String> properties = Maps.newHashMap(tableProperties);
+    updatePathInProperty(properties, sourcePrefix, targetPrefix, 
TableProperties.OBJECT_STORE_PATH);
+    updatePathInProperty(
+        properties, sourcePrefix, targetPrefix, 
TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
+    updatePathInProperty(
+        properties, sourcePrefix, targetPrefix, 
TableProperties.WRITE_DATA_LOCATION);
+    updatePathInProperty(
+        properties, sourcePrefix, targetPrefix, 
TableProperties.WRITE_METADATA_LOCATION);
+
+    return properties;
+  }
+
+  private static void updatePathInProperty(
+      Map<String, String> properties,
+      String sourcePrefix,
+      String targetPrefix,
+      String propertyName) {
+    if (properties.containsKey(propertyName)) {
+      properties.put(
+          propertyName, newPath(properties.get(propertyName), sourcePrefix, 
targetPrefix));
+    }
+  }
+
+  private static List<TableMetadata.MetadataLogEntry> updatePathInMetadataLogs(
+      TableMetadata metadata, String sourcePrefix, String targetPrefix) {
+    List<TableMetadata.MetadataLogEntry> metadataLogEntries =
+        Lists.newArrayListWithCapacity(metadata.previousFiles().size());
+    for (TableMetadata.MetadataLogEntry metadataLog : 
metadata.previousFiles()) {
+      TableMetadata.MetadataLogEntry newMetadataLog =
+          new TableMetadata.MetadataLogEntry(
+              metadataLog.timestampMillis(),
+              newPath(metadataLog.file(), sourcePrefix, targetPrefix));
+      metadataLogEntries.add(newMetadataLog);
+    }
+    return metadataLogEntries;
+  }
+
+  private static List<Snapshot> updatePathInSnapshots(
+      TableMetadata metadata, String sourcePrefix, String targetPrefix) {
+    List<Snapshot> newSnapshots = 
Lists.newArrayListWithCapacity(metadata.snapshots().size());
+    for (Snapshot snapshot : metadata.snapshots()) {
+      String newManifestListLocation =
+          newPath(snapshot.manifestListLocation(), sourcePrefix, targetPrefix);
+      Snapshot newSnapshot =
+          new BaseSnapshot(
+              snapshot.sequenceNumber(),
+              snapshot.snapshotId(),
+              snapshot.parentId(),
+              snapshot.timestampMillis(),
+              snapshot.operation(),
+              snapshot.summary(),
+              snapshot.schemaId(),
+              newManifestListLocation);
+      newSnapshots.add(newSnapshot);
+    }
+    return newSnapshots;
+  }
+
+  /**
+   * Rewrite a manifest list representing a snapshot, replacing path 
references.
+   *
+   * @param snapshot snapshot represented by the manifest list
+   * @param io file io
+   * @param tableMetadata metadata of table
+   * @param manifestsToRewrite a list of manifest files to filter for rewrite
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @param stagingDir staging directory
+   * @param outputPath location to write the manifest list
+   * @return a copy plan for manifest files whose metadata were contained in 
the rewritten manifest
+   *     list
+   */
+  public static RewriteResult<ManifestFile> rewriteManifestList(
+      Snapshot snapshot,
+      FileIO io,
+      TableMetadata tableMetadata,
+      Set<String> manifestsToRewrite,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingDir,
+      String outputPath) {
+    RewriteResult<ManifestFile> result = new RewriteResult<>();
+    OutputFile outputFile = io.newOutputFile(outputPath);
+
+    List<ManifestFile> manifestFiles = manifestFilesInSnapshot(io, snapshot);
+    List<ManifestFile> manifestFilesToRewrite =
+        manifestFiles.stream()
+            .filter(mf -> manifestsToRewrite.contains(mf.path()))
+            .collect(Collectors.toList());
+    manifestFilesToRewrite.forEach(
+        mf ->
+            Preconditions.checkArgument(
+                mf.path().startsWith(sourcePrefix),
+                "Encountered manifest file %s not under the source prefix %s",
+                mf.path(),
+                sourcePrefix));
+
+    try (FileAppender<ManifestFile> writer =
+        ManifestLists.write(
+            tableMetadata.formatVersion(),
+            outputFile,
+            snapshot.snapshotId(),
+            snapshot.parentId(),
+            snapshot.sequenceNumber())) {
+
+      for (ManifestFile file : manifestFilesToRewrite) {
+        ManifestFile newFile = file.copy();
+        ((StructLike) newFile).set(0, newPath(newFile.path(), sourcePrefix, 
targetPrefix));
+        writer.add(newFile);
+
+        result.toRewrite().add(file);
+        result.copyPlan().add(Pair.of(stagingPath(file.path(), stagingDir), 
newFile.path()));
+      }
+      return result;
+    } catch (IOException e) {
+      throw new UncheckedIOException(
+          "Failed to rewrite the manifest list file " + 
snapshot.manifestListLocation(), e);
+    }
+  }
+
+  private static List<ManifestFile> manifestFilesInSnapshot(FileIO io, 
Snapshot snapshot) {
+    String path = snapshot.manifestListLocation();
+    List<ManifestFile> manifestFiles = Lists.newLinkedList();
+    try {
+      manifestFiles = ManifestLists.read(io.newInputFile(path));
+    } catch (RuntimeIOException e) {
+      LOG.warn("Failed to read manifest list {}", path, e);
+    }
+    return manifestFiles;
+  }
+
+  /**
+   * Rewrite a data manifest, replacing path references.
+   *
+   * @param manifestFile source manifest file to rewrite
+   * @param outputFile output file to rewrite manifest file to
+   * @param io file io
+   * @param format format of the manifest file
+   * @param specsById map of partition specs by id
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @return a copy plan of content files in the manifest that was rewritten
+   */
+  public static List<Pair<String, String>> rewriteManifest(
+      ManifestFile manifestFile,
+      OutputFile outputFile,
+      FileIO io,
+      int format,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
+    try (ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(entry -> writeDataFileEntry(entry, spec, sourcePrefix, 
targetPrefix, writer))
+          .collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * Rewrite a delete manifest, replacing path references.
+   *
+   * @param manifestFile source delete manifest to rewrite
+   * @param outputFile output file to rewrite manifest file to
+   * @param io file io
+   * @param format format of the manifest file
+   * @param specsById map of partition specs by id
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @param stagingLocation staging location for rewritten files (referred 
delete file will be
+   *     rewritten here)
+   * @return a copy plan of content files in the manifest that was rewritten
+   */
+  public static RewriteResult<DeleteFile> rewriteDeleteManifest(
+      ManifestFile manifestFile,
+      OutputFile outputFile,
+      FileIO io,
+      int format,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingLocation)
+      throws IOException {
+    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
+    try (ManifestWriter<DeleteFile> writer =
+            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
+        ManifestReader<DeleteFile> reader =
+            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+      return StreamSupport.stream(reader.entries().spliterator(), false)
+          .map(
+              entry ->
+                  writeDeleteFileEntry(
+                      entry, spec, sourcePrefix, targetPrefix, 
stagingLocation, writer))
+          .reduce(new RewriteResult<>(), RewriteResult::append);
+    }
+  }
+
+  private static Pair<String, String> writeDataFileEntry(
+      ManifestEntry<DataFile> entry,
+      PartitionSpec spec,
+      String sourcePrefix,
+      String targetPrefix,
+      ManifestWriter<DataFile> writer) {
+    DataFile dataFile = entry.file();
+    String sourceDataFilePath = dataFile.location();
+    Preconditions.checkArgument(
+        sourceDataFilePath.startsWith(sourcePrefix),
+        "Encountered data file %s not under the source prefix %s",
+        sourceDataFilePath,
+        sourcePrefix);
+    String targetDataFilePath = newPath(sourceDataFilePath, sourcePrefix, 
targetPrefix);
+    DataFile newDataFile =
+        
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
+    appendEntryWithFile(entry, writer, newDataFile);
+    return Pair.of(sourceDataFilePath, newDataFile.location());

Review Comment:
   nit: Looks like writeDataFileEntry and writeDeleteFIle entry returns Pair 
and RewriteResult respectively, same for all the caller on the stack, I am 
wondering if we shall use the generic RewriteResult for both?



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -0,0 +1,1003 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.actions.ActionsProvider;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import scala.Tuple2;
+
+public class TestRewriteTablePathsAction extends TestBase {
+
+  @TempDir private Path staging;
+  @TempDir private Path tableDir;
+  @TempDir private Path newTableDir;
+  @TempDir private Path targetTableDir;
+
+  protected ActionsProvider actions() {
+    return SparkActions.get();
+  }
+
+  private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
+  protected static final Schema SCHEMA =
+      new Schema(
+          optional(1, "c1", Types.IntegerType.get()),
+          optional(2, "c2", Types.StringType.get()),
+          optional(3, "c3", Types.StringType.get()));
+
+  protected String tableLocation = null;
+  private Table table = null;
+
+  private final String ns = "testns";
+  private final String backupNs = "backupns";
+
+  @BeforeEach
+  public void setupTableLocation() throws Exception {
+    this.tableLocation = tableDir.toFile().toURI().toString();
+    this.table = createATableWith2Snapshots(tableLocation);
+    createNameSpaces();
+  }
+
+  @AfterEach
+  public void cleanupTableSetup() throws Exception {
+    dropNameSpaces();
+  }
+
+  private Table createATableWith2Snapshots(String location) {
+    return createTableWithSnapshots(location, 2);
+  }
+
+  private Table createTableWithSnapshots(String location, int snapshotNumber) {
+    return createTableWithSnapshots(location, snapshotNumber, 
Maps.newHashMap());
+  }
+
+  protected Table createTableWithSnapshots(
+      String location, int snapshotNumber, Map<String, String> properties) {
+    Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
properties, location);
+
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+
+    Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).coalesce(1);
+
+    for (int i = 0; i < snapshotNumber; i++) {
+      df.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    }
+
+    return newTable;
+  }
+
+  private void createNameSpaces() {
+    sql("CREATE DATABASE IF NOT EXISTS %s", ns);
+    sql("CREATE DATABASE IF NOT EXISTS %s", backupNs);
+  }
+
+  private void dropNameSpaces() {
+    sql("DROP DATABASE IF EXISTS %s CASCADE", ns);
+    sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
+  }
+
+  @Test
+  public void testRewritePath() throws Exception {
+    String targetTableLocation = targetTableLocation();
+
+    // check the data file location before the rebuild
+    List<String> validDataFiles =
+        spark
+            .read()
+            .format("iceberg")
+            .load(tableLocation + "#files")
+            .select("file_path")
+            .as(Encoders.STRING())
+            .collectAsList();
+    assertThat(validDataFiles.size()).isEqualTo(2);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(tableLocation, targetTableLocation)
+            .endVersion("v3.metadata.json")
+            .execute();
+
+    assertThat(result.latestVersion()).isEqualTo("v3.metadata.json");
+
+    checkFileNum(3, 2, 2, 9, result);
+
+    // copy the metadata files and data files
+    copyTableFiles(result);
+
+    // verify the data file path after the rebuild
+    List<String> validDataFilesAfterRebuilt =
+        spark
+            .read()
+            .format("iceberg")
+            .load(targetTableLocation + "#files")
+            .select("file_path")
+            .as(Encoders.STRING())
+            .collectAsList();
+    assertThat(validDataFilesAfterRebuilt.size()).isEqualTo(2);
+    for (String item : validDataFilesAfterRebuilt) {
+      assertThat(item).startsWith(targetTableLocation);
+    }

Review Comment:
   This can be simplified with 
   ```java
   assertThat(validDataFilesAfterRebuilt)
   .hasSize(2)
   .allMatch(item -> item.startsWith(targetTableLocation));
   ```



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -0,0 +1,1003 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.actions.ActionsProvider;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import scala.Tuple2;
+
+public class TestRewriteTablePathsAction extends TestBase {
+
+  @TempDir private Path staging;
+  @TempDir private Path tableDir;
+  @TempDir private Path newTableDir;
+  @TempDir private Path targetTableDir;
+
+  protected ActionsProvider actions() {
+    return SparkActions.get();
+  }
+
+  private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
+  protected static final Schema SCHEMA =
+      new Schema(
+          optional(1, "c1", Types.IntegerType.get()),
+          optional(2, "c2", Types.StringType.get()),
+          optional(3, "c3", Types.StringType.get()));
+
+  protected String tableLocation = null;
+  private Table table = null;
+
+  private final String ns = "testns";
+  private final String backupNs = "backupns";
+
+  @BeforeEach
+  public void setupTableLocation() throws Exception {
+    this.tableLocation = tableDir.toFile().toURI().toString();
+    this.table = createATableWith2Snapshots(tableLocation);
+    createNameSpaces();
+  }
+
+  @AfterEach
+  public void cleanupTableSetup() throws Exception {
+    dropNameSpaces();
+  }
+
+  private Table createATableWith2Snapshots(String location) {
+    return createTableWithSnapshots(location, 2);
+  }
+
+  private Table createTableWithSnapshots(String location, int snapshotNumber) {
+    return createTableWithSnapshots(location, snapshotNumber, 
Maps.newHashMap());
+  }
+
+  protected Table createTableWithSnapshots(
+      String location, int snapshotNumber, Map<String, String> properties) {
+    Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
properties, location);
+
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+
+    Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).coalesce(1);
+
+    for (int i = 0; i < snapshotNumber; i++) {
+      df.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    }
+
+    return newTable;
+  }
+
+  private void createNameSpaces() {
+    sql("CREATE DATABASE IF NOT EXISTS %s", ns);
+    sql("CREATE DATABASE IF NOT EXISTS %s", backupNs);
+  }
+
+  private void dropNameSpaces() {
+    sql("DROP DATABASE IF EXISTS %s CASCADE", ns);
+    sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
+  }
+
+  @Test
+  public void testRewritePath() throws Exception {
+    String targetTableLocation = targetTableLocation();
+
+    // check the data file location before the rebuild
+    List<String> validDataFiles =
+        spark
+            .read()
+            .format("iceberg")
+            .load(tableLocation + "#files")
+            .select("file_path")
+            .as(Encoders.STRING())
+            .collectAsList();
+    assertThat(validDataFiles.size()).isEqualTo(2);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(tableLocation, targetTableLocation)
+            .endVersion("v3.metadata.json")
+            .execute();
+
+    assertThat(result.latestVersion()).isEqualTo("v3.metadata.json");
+
+    checkFileNum(3, 2, 2, 9, result);
+
+    // copy the metadata files and data files
+    copyTableFiles(result);
+
+    // verify the data file path after the rebuild
+    List<String> validDataFilesAfterRebuilt =
+        spark
+            .read()
+            .format("iceberg")
+            .load(targetTableLocation + "#files")
+            .select("file_path")
+            .as(Encoders.STRING())
+            .collectAsList();
+    assertThat(validDataFilesAfterRebuilt.size()).isEqualTo(2);
+    for (String item : validDataFilesAfterRebuilt) {
+      assertThat(item).startsWith(targetTableLocation);
+    }
+
+    // verify data rows
+    Dataset<Row> resultDF = 
spark.read().format("iceberg").load(targetTableLocation);
+    List<ThreeColumnRecord> actualRecords =
+        resultDF.sort("c1", "c2", 
"c3").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));

Review Comment:
   I am wondering if we can get expectedRecords by reading from source table 
and compare it against copied table?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter;
+import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.actions.ImmutableRewriteTablePath;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePath>
+    implements RewriteTablePath {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
+  private static final String RESULT_LOCATION = "file-list";
+
+  private String sourcePrefix;
+  private String targetPrefix;
+  private String startVersionName;
+  private String endVersionName;
+  private String stagingDir;
+
+  private final Table table;
+
+  RewriteTablePathSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  protected RewriteTablePath self() {
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath rewriteLocationPrefix(String sPrefix, String 
tPrefix) {
+    Preconditions.checkArgument(
+        sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be 
empty.", sPrefix);
+    this.sourcePrefix = sPrefix;
+    this.targetPrefix = tPrefix;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath startVersion(String sVersion) {
+    Preconditions.checkArgument(
+        sVersion != null && !sVersion.trim().isEmpty(),
+        "Start version('%s') cannot be empty.",
+        sVersion);
+    this.startVersionName = sVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath endVersion(String eVersion) {
+    Preconditions.checkArgument(
+        eVersion != null && !eVersion.trim().isEmpty(),
+        "End version('%s') cannot be empty.",
+        eVersion);
+    this.endVersionName = eVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath stagingLocation(String stagingLocation) {
+    Preconditions.checkArgument(
+        stagingLocation != null && !stagingLocation.isEmpty(),
+        "Staging location('%s') cannot be empty.",
+        stagingLocation);
+    this.stagingDir = stagingLocation;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    validateInputs();
+    JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    String resultLocation = rebuildMetadata();
+    return ImmutableRewriteTablePath.Result.builder()
+        .stagingLocation(stagingDir)
+        .fileListLocation(resultLocation)
+        .latestVersion(RewriteTablePathUtil.fileName(endVersionName))
+        .build();
+  }
+
+  private void validateInputs() {
+    Preconditions.checkArgument(
+        sourcePrefix != null && !sourcePrefix.isEmpty(),
+        "Source prefix('%s') cannot be empty.",
+        sourcePrefix);
+    Preconditions.checkArgument(
+        targetPrefix != null && !targetPrefix.isEmpty(),
+        "Target prefix('%s') cannot be empty.",
+        targetPrefix);
+    Preconditions.checkArgument(
+        !sourcePrefix.equals(targetPrefix),
+        "Source prefix cannot be the same as target prefix (%s)",
+        sourcePrefix);
+
+    validateAndSetEndVersion();
+    validateAndSetStartVersion();
+
+    if (stagingDir == null) {
+      stagingDir = getMetadataLocation(table) + "copy-table-staging-" + 
UUID.randomUUID() + "/";
+    } else if (!stagingDir.endsWith("/")) {
+      stagingDir = stagingDir + "/";
+    }
+  }
+
+  private void validateAndSetEndVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (endVersionName == null) {
+      LOG.info("No end version specified. Will stage all files to the latest 
table version.");
+      Preconditions.checkNotNull(
+          tableMetadata.metadataFileLocation(), "Metadata file location should 
not be null");
+      this.endVersionName = tableMetadata.metadataFileLocation();
+    } else {
+      this.endVersionName = validateVersion(tableMetadata, endVersionName);
+    }
+  }
+
+  private void validateAndSetStartVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (startVersionName != null) {
+      this.startVersionName = validateVersion(tableMetadata, startVersionName);
+    }
+  }
+
+  private String validateVersion(TableMetadata tableMetadata, String 
versionFileName) {
+    String versionFile = versionFile(tableMetadata, versionFileName);
+
+    Preconditions.checkNotNull(
+        versionFile, "Version file %s does not exist in metadata log.", 
versionFile);
+    Preconditions.checkArgument(
+        fileExist(versionFile), "Version file %s does not exist.", 
versionFile);
+    return versionFile;
+  }
+
+  private String versionFile(TableMetadata metadata, String versionFileName) {
+    if (versionInFilePath(metadata.metadataFileLocation(), versionFileName)) {
+      return metadata.metadataFileLocation();
+    }
+
+    for (MetadataLogEntry log : metadata.previousFiles()) {
+      if (versionInFilePath(log.file(), versionFileName)) {
+        return log.file();
+      }
+    }
+    return null;
+  }
+
+  private boolean versionInFilePath(String path, String version) {
+    return RewriteTablePathUtil.fileName(path).equals(version);
+  }
+
+  private String jobDesc() {
+    if (startVersionName != null) {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "up to version '%s'.",
+          sourcePrefix, targetPrefix, table.name(), endVersionName);
+    } else {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "from version '%s' to '%s'.",
+          sourcePrefix, targetPrefix, table.name(), startVersionName, 
endVersionName);
+    }
+  }
+
+  /**
+   * Rebuild metadata in a staging location, with paths rewritten.
+   *
+   * <ul>
+   *   <li>Rebuild version files to staging
+   *   <li>Rebuild manifest list files to staging
+   *   <li>Rebuild manifest to staging
+   *   <li>Get all files needed to move
+   * </ul>
+   */
+  private String rebuildMetadata() {
+    TableMetadata startMetadata =
+        startVersionName != null
+            ? ((HasTableOperations) newStaticTable(startVersionName, 
table.io()))
+                .operations()
+                .current()
+            : null;
+    TableMetadata endMetadata =
+        ((HasTableOperations) newStaticTable(endVersionName, 
table.io())).operations().current();
+
+    Preconditions.checkArgument(
+        endMetadata.statisticsFiles() == null || 
endMetadata.statisticsFiles().isEmpty(),
+        "Statistic files are not supported yet.");
+
+    // rebuild version files
+    RewriteResult<Snapshot> rewriteVersionResult = 
rewriteVersionFiles(endMetadata);
+    Set<Snapshot> diffSnapshots =
+        getDiffSnapshotIds(startMetadata, rewriteVersionResult.toRewrite());
+
+    Set<String> manifestsToRewrite = manifestsToRewrite(diffSnapshots, 
startMetadata);
+    Set<Snapshot> validSnapshots =
+        Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
+
+    // rebuild manifest-list files
+    RewriteResult<ManifestFile> rewriteManifestListResult =
+        validSnapshots.stream()
+            .map(snapshot -> rewriteManifestList(snapshot, endMetadata, 
manifestsToRewrite))
+            .reduce(new RewriteResult<>(), RewriteResult::append);
+
+    // rebuild manifest files
+    RewriteResult<DeleteFile> rewriteManifestResult =
+        rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite());
+
+    // rebuild position delete files
+    rewritePositionDeletes(endMetadata, rewriteManifestResult.toRewrite());
+
+    Set<Pair<String, String>> copyPlan = Sets.newHashSet();
+    copyPlan.addAll(rewriteVersionResult.copyPlan());
+    copyPlan.addAll(rewriteManifestListResult.copyPlan());
+    copyPlan.addAll(rewriteManifestResult.copyPlan());
+
+    return saveFileList(copyPlan);
+  }
+
+  private String saveFileList(Set<Pair<String, String>> filesToMove) {
+    List<Tuple2<String, String>> fileList =
+        filesToMove.stream()
+            .map(p -> Tuple2.apply(p.first(), p.second()))
+            .collect(Collectors.toList());
+    Dataset<Tuple2<String, String>> fileListDataset =
+        spark().createDataset(fileList, Encoders.tuple(Encoders.STRING(), 
Encoders.STRING()));
+    String fileListPath = stagingDir + RESULT_LOCATION;
+    fileListDataset
+        .repartition(1)
+        .write()
+        .mode(SaveMode.Overwrite)
+        .format("csv")
+        .save(fileListPath);
+    return fileListPath;
+  }
+
+  private Set<Snapshot> getDiffSnapshotIds(
+      TableMetadata startMetadata, Set<Snapshot> allSnapshots) {
+    if (startMetadata == null) {
+      return allSnapshots;
+    } else {
+      Set<Long> startSnapshotIds =
+          
startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+      return allSnapshots.stream()
+          .filter(s -> !startSnapshotIds.contains(s.snapshotId()))
+          .collect(Collectors.toSet());
+    }
+  }
+
+  private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata 
endMetadata) {
+    RewriteResult<Snapshot> result = new RewriteResult<>();
+    result.toRewrite().addAll(endMetadata.snapshots());
+    result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName));
+
+    List<MetadataLogEntry> versions = endMetadata.previousFiles();
+    for (int i = versions.size() - 1; i >= 0; i--) {
+      String versionFilePath = versions.get(i).file();
+      if (versionFilePath.equals(startVersionName)) {
+        break;
+      }
+
+      Preconditions.checkArgument(
+          fileExist(versionFilePath),
+          String.format("Version file %s doesn't exist", versionFilePath));
+      TableMetadata tableMetadata =
+          new StaticTableOperations(versionFilePath, table.io()).current();
+
+      result.toRewrite().addAll(tableMetadata.snapshots());
+      result.copyPlan().add(rewriteVersionFile(tableMetadata, 
versionFilePath));
+    }
+
+    return result;
+  }
+
+  private Pair<String, String> rewriteVersionFile(TableMetadata metadata, 
String versionFilePath) {
+    String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, 
stagingDir);
+    TableMetadata newTableMetadata =
+        RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, 
targetPrefix);
+    TableMetadataParser.overwrite(newTableMetadata, 
table.io().newOutputFile(stagingPath));
+    return Pair.of(stagingPath, newPath(versionFilePath, sourcePrefix, 
targetPrefix));
+  }
+
+  /**
+   * Rewrite a manifest list representing a snapshot.
+   *
+   * @param snapshot snapshot represented by the manifest list
+   * @param tableMetadata metadata of table
+   * @param manifestsToRewrite filter of manifests to rewrite.
+   * @return a result including a copy plan for the manifests contained in the 
manifest list, as
+   *     well as for the manifest list itself
+   */
+  private RewriteResult<ManifestFile> rewriteManifestList(
+      Snapshot snapshot, TableMetadata tableMetadata, Set<String> 
manifestsToRewrite) {
+    RewriteResult<ManifestFile> result = new RewriteResult<>();
+
+    String path = snapshot.manifestListLocation();
+    String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
+    RewriteResult<ManifestFile> rewriteResult =
+        RewriteTablePathUtil.rewriteManifestList(
+            snapshot,
+            table.io(),
+            tableMetadata,
+            manifestsToRewrite,
+            sourcePrefix,
+            targetPrefix,
+            stagingDir,
+            outputPath);
+
+    result.append(rewriteResult);
+    // add the manifest list copy plan itself to the result
+    result.copyPlan().add(Pair.of(outputPath, newPath(path, sourcePrefix, 
targetPrefix)));
+    return result;
+  }
+
+  private Set<String> manifestsToRewrite(Set<Snapshot> diffSnapshots, 
TableMetadata startMetadata) {
+    try {
+      Table endStaticTable = newStaticTable(endVersionName, table.io());
+      Dataset<Row> lastVersionFiles = 
manifestDS(endStaticTable).select("path");
+      if (startMetadata == null) {
+        return 
Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList());
+      } else {
+        Set<Long> diffSnapshotIds =
+            
diffSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+        return Sets.newHashSet(
+            lastVersionFiles
+                .distinct()
+                
.filter(functions.column("added_snapshot_id").isInCollection(diffSnapshotIds))

Review Comment:
   nit, might worth use reference instead of hardcoded string 
`.filter(functions.column(ManifestFile.SNAPSHOT_ID.name()).isInCollection(diffSnapshotIds))`



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter;
+import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.actions.ImmutableRewriteTablePath;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePath>
+    implements RewriteTablePath {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
+  private static final String RESULT_LOCATION = "file-list";
+
+  private String sourcePrefix;
+  private String targetPrefix;
+  private String startVersionName;
+  private String endVersionName;
+  private String stagingDir;
+
+  private final Table table;
+
+  RewriteTablePathSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  protected RewriteTablePath self() {
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath rewriteLocationPrefix(String sPrefix, String 
tPrefix) {
+    Preconditions.checkArgument(
+        sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be 
empty.", sPrefix);
+    this.sourcePrefix = sPrefix;
+    this.targetPrefix = tPrefix;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath startVersion(String sVersion) {
+    Preconditions.checkArgument(
+        sVersion != null && !sVersion.trim().isEmpty(),
+        "Start version('%s') cannot be empty.",
+        sVersion);
+    this.startVersionName = sVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath endVersion(String eVersion) {
+    Preconditions.checkArgument(
+        eVersion != null && !eVersion.trim().isEmpty(),
+        "End version('%s') cannot be empty.",
+        eVersion);
+    this.endVersionName = eVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath stagingLocation(String stagingLocation) {
+    Preconditions.checkArgument(
+        stagingLocation != null && !stagingLocation.isEmpty(),
+        "Staging location('%s') cannot be empty.",
+        stagingLocation);
+    this.stagingDir = stagingLocation;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    validateInputs();
+    JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    String resultLocation = rebuildMetadata();
+    return ImmutableRewriteTablePath.Result.builder()
+        .stagingLocation(stagingDir)
+        .fileListLocation(resultLocation)
+        .latestVersion(RewriteTablePathUtil.fileName(endVersionName))
+        .build();
+  }
+
+  private void validateInputs() {
+    Preconditions.checkArgument(
+        sourcePrefix != null && !sourcePrefix.isEmpty(),
+        "Source prefix('%s') cannot be empty.",
+        sourcePrefix);
+    Preconditions.checkArgument(
+        targetPrefix != null && !targetPrefix.isEmpty(),
+        "Target prefix('%s') cannot be empty.",
+        targetPrefix);
+    Preconditions.checkArgument(
+        !sourcePrefix.equals(targetPrefix),
+        "Source prefix cannot be the same as target prefix (%s)",
+        sourcePrefix);
+
+    validateAndSetEndVersion();
+    validateAndSetStartVersion();
+
+    if (stagingDir == null) {
+      stagingDir = getMetadataLocation(table) + "copy-table-staging-" + 
UUID.randomUUID() + "/";
+    } else if (!stagingDir.endsWith("/")) {
+      stagingDir = stagingDir + "/";
+    }
+  }
+
+  private void validateAndSetEndVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (endVersionName == null) {
+      LOG.info("No end version specified. Will stage all files to the latest 
table version.");
+      Preconditions.checkNotNull(
+          tableMetadata.metadataFileLocation(), "Metadata file location should 
not be null");
+      this.endVersionName = tableMetadata.metadataFileLocation();
+    } else {
+      this.endVersionName = validateVersion(tableMetadata, endVersionName);
+    }
+  }
+
+  private void validateAndSetStartVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (startVersionName != null) {
+      this.startVersionName = validateVersion(tableMetadata, startVersionName);
+    }
+  }
+
+  private String validateVersion(TableMetadata tableMetadata, String 
versionFileName) {
+    String versionFile = versionFile(tableMetadata, versionFileName);
+
+    Preconditions.checkNotNull(
+        versionFile, "Version file %s does not exist in metadata log.", 
versionFile);
+    Preconditions.checkArgument(
+        fileExist(versionFile), "Version file %s does not exist.", 
versionFile);
+    return versionFile;
+  }
+
+  private String versionFile(TableMetadata metadata, String versionFileName) {
+    if (versionInFilePath(metadata.metadataFileLocation(), versionFileName)) {
+      return metadata.metadataFileLocation();
+    }
+
+    for (MetadataLogEntry log : metadata.previousFiles()) {
+      if (versionInFilePath(log.file(), versionFileName)) {
+        return log.file();
+      }
+    }
+    return null;
+  }
+
+  private boolean versionInFilePath(String path, String version) {
+    return RewriteTablePathUtil.fileName(path).equals(version);
+  }
+
+  private String jobDesc() {
+    if (startVersionName != null) {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "up to version '%s'.",
+          sourcePrefix, targetPrefix, table.name(), endVersionName);
+    } else {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "from version '%s' to '%s'.",
+          sourcePrefix, targetPrefix, table.name(), startVersionName, 
endVersionName);
+    }
+  }
+
+  /**
+   * Rebuild metadata in a staging location, with paths rewritten.
+   *
+   * <ul>
+   *   <li>Rebuild version files to staging
+   *   <li>Rebuild manifest list files to staging
+   *   <li>Rebuild manifest to staging
+   *   <li>Get all files needed to move
+   * </ul>
+   */
+  private String rebuildMetadata() {
+    TableMetadata startMetadata =
+        startVersionName != null
+            ? ((HasTableOperations) newStaticTable(startVersionName, 
table.io()))
+                .operations()
+                .current()
+            : null;
+    TableMetadata endMetadata =
+        ((HasTableOperations) newStaticTable(endVersionName, 
table.io())).operations().current();
+
+    Preconditions.checkArgument(
+        endMetadata.statisticsFiles() == null || 
endMetadata.statisticsFiles().isEmpty(),
+        "Statistic files are not supported yet.");
+
+    // rebuild version files
+    RewriteResult<Snapshot> rewriteVersionResult = 
rewriteVersionFiles(endMetadata);
+    Set<Snapshot> diffSnapshots =
+        getDiffSnapshotIds(startMetadata, rewriteVersionResult.toRewrite());

Review Comment:
   nit, I think deltaSnapshots might read a bit better



##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -0,0 +1,1003 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.actions.ActionsProvider;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import scala.Tuple2;
+
+public class TestRewriteTablePathsAction extends TestBase {
+
+  @TempDir private Path staging;
+  @TempDir private Path tableDir;
+  @TempDir private Path newTableDir;
+  @TempDir private Path targetTableDir;
+
+  protected ActionsProvider actions() {
+    return SparkActions.get();
+  }
+
+  private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
+  protected static final Schema SCHEMA =
+      new Schema(
+          optional(1, "c1", Types.IntegerType.get()),
+          optional(2, "c2", Types.StringType.get()),
+          optional(3, "c3", Types.StringType.get()));
+
+  protected String tableLocation = null;
+  private Table table = null;
+
+  private final String ns = "testns";
+  private final String backupNs = "backupns";
+
+  @BeforeEach
+  public void setupTableLocation() throws Exception {
+    this.tableLocation = tableDir.toFile().toURI().toString();
+    this.table = createATableWith2Snapshots(tableLocation);
+    createNameSpaces();
+  }
+
+  @AfterEach
+  public void cleanupTableSetup() throws Exception {
+    dropNameSpaces();
+  }
+
+  private Table createATableWith2Snapshots(String location) {
+    return createTableWithSnapshots(location, 2);
+  }
+
+  private Table createTableWithSnapshots(String location, int snapshotNumber) {
+    return createTableWithSnapshots(location, snapshotNumber, 
Maps.newHashMap());
+  }
+
+  protected Table createTableWithSnapshots(
+      String location, int snapshotNumber, Map<String, String> properties) {
+    Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
properties, location);
+
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+
+    Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).coalesce(1);
+
+    for (int i = 0; i < snapshotNumber; i++) {
+      df.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    }
+
+    return newTable;
+  }
+
+  private void createNameSpaces() {
+    sql("CREATE DATABASE IF NOT EXISTS %s", ns);
+    sql("CREATE DATABASE IF NOT EXISTS %s", backupNs);
+  }
+
+  private void dropNameSpaces() {
+    sql("DROP DATABASE IF EXISTS %s CASCADE", ns);
+    sql("DROP DATABASE IF EXISTS %s CASCADE", backupNs);
+  }
+
+  @Test
+  public void testRewritePath() throws Exception {
+    String targetTableLocation = targetTableLocation();
+
+    // check the data file location before the rebuild
+    List<String> validDataFiles =
+        spark
+            .read()
+            .format("iceberg")
+            .load(tableLocation + "#files")
+            .select("file_path")
+            .as(Encoders.STRING())
+            .collectAsList();
+    assertThat(validDataFiles.size()).isEqualTo(2);
+
+    RewriteTablePath.Result result =
+        actions()
+            .rewriteTablePath(table)
+            .rewriteLocationPrefix(tableLocation, targetTableLocation)
+            .endVersion("v3.metadata.json")
+            .execute();
+
+    assertThat(result.latestVersion()).isEqualTo("v3.metadata.json");
+
+    checkFileNum(3, 2, 2, 9, result);
+
+    // copy the metadata files and data files
+    copyTableFiles(result);
+
+    // verify the data file path after the rebuild
+    List<String> validDataFilesAfterRebuilt =
+        spark
+            .read()
+            .format("iceberg")
+            .load(targetTableLocation + "#files")
+            .select("file_path")
+            .as(Encoders.STRING())
+            .collectAsList();
+    assertThat(validDataFilesAfterRebuilt.size()).isEqualTo(2);
+    for (String item : validDataFilesAfterRebuilt) {
+      assertThat(item).startsWith(targetTableLocation);
+    }
+
+    // verify data rows
+    Dataset<Row> resultDF = 
spark.read().format("iceberg").load(targetTableLocation);
+    List<ThreeColumnRecord> actualRecords =
+        resultDF.sort("c1", "c2", 
"c3").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    expectedRecords.add(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+
+    assertThat(expectedRecords).isEqualTo(actualRecords);
+  }
+
+  @Test
+  public void testSameLocations() throws Exception {

Review Comment:
   nit: looks like we dont need `throws Exception` here



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteTablePathUtil;
+import org.apache.iceberg.RewriteTablePathUtil.PositionDeleteReaderWriter;
+import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadata.MetadataLogEntry;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.actions.ImmutableRewriteTablePath;
+import org.apache.iceberg.actions.RewriteTablePath;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePath>
+    implements RewriteTablePath {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RewriteTablePathSparkAction.class);
+  private static final String RESULT_LOCATION = "file-list";
+
+  private String sourcePrefix;
+  private String targetPrefix;
+  private String startVersionName;
+  private String endVersionName;
+  private String stagingDir;
+
+  private final Table table;
+
+  RewriteTablePathSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  protected RewriteTablePath self() {
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath rewriteLocationPrefix(String sPrefix, String 
tPrefix) {
+    Preconditions.checkArgument(
+        sPrefix != null && !sPrefix.isEmpty(), "Source prefix('%s') cannot be 
empty.", sPrefix);
+    this.sourcePrefix = sPrefix;
+    this.targetPrefix = tPrefix;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath startVersion(String sVersion) {
+    Preconditions.checkArgument(
+        sVersion != null && !sVersion.trim().isEmpty(),
+        "Start version('%s') cannot be empty.",
+        sVersion);
+    this.startVersionName = sVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath endVersion(String eVersion) {
+    Preconditions.checkArgument(
+        eVersion != null && !eVersion.trim().isEmpty(),
+        "End version('%s') cannot be empty.",
+        eVersion);
+    this.endVersionName = eVersion;
+    return this;
+  }
+
+  @Override
+  public RewriteTablePath stagingLocation(String stagingLocation) {
+    Preconditions.checkArgument(
+        stagingLocation != null && !stagingLocation.isEmpty(),
+        "Staging location('%s') cannot be empty.",
+        stagingLocation);
+    this.stagingDir = stagingLocation;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    validateInputs();
+    JobGroupInfo info = newJobGroupInfo("REWRITE-TABLE-PATH", jobDesc());
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  private Result doExecute() {
+    String resultLocation = rebuildMetadata();
+    return ImmutableRewriteTablePath.Result.builder()
+        .stagingLocation(stagingDir)
+        .fileListLocation(resultLocation)
+        .latestVersion(RewriteTablePathUtil.fileName(endVersionName))
+        .build();
+  }
+
+  private void validateInputs() {
+    Preconditions.checkArgument(
+        sourcePrefix != null && !sourcePrefix.isEmpty(),
+        "Source prefix('%s') cannot be empty.",
+        sourcePrefix);
+    Preconditions.checkArgument(
+        targetPrefix != null && !targetPrefix.isEmpty(),
+        "Target prefix('%s') cannot be empty.",
+        targetPrefix);
+    Preconditions.checkArgument(
+        !sourcePrefix.equals(targetPrefix),
+        "Source prefix cannot be the same as target prefix (%s)",
+        sourcePrefix);
+
+    validateAndSetEndVersion();
+    validateAndSetStartVersion();
+
+    if (stagingDir == null) {
+      stagingDir = getMetadataLocation(table) + "copy-table-staging-" + 
UUID.randomUUID() + "/";
+    } else if (!stagingDir.endsWith("/")) {
+      stagingDir = stagingDir + "/";
+    }
+  }
+
+  private void validateAndSetEndVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (endVersionName == null) {
+      LOG.info("No end version specified. Will stage all files to the latest 
table version.");
+      Preconditions.checkNotNull(
+          tableMetadata.metadataFileLocation(), "Metadata file location should 
not be null");
+      this.endVersionName = tableMetadata.metadataFileLocation();
+    } else {
+      this.endVersionName = validateVersion(tableMetadata, endVersionName);
+    }
+  }
+
+  private void validateAndSetStartVersion() {
+    TableMetadata tableMetadata = ((HasTableOperations) 
table).operations().current();
+
+    if (startVersionName != null) {
+      this.startVersionName = validateVersion(tableMetadata, startVersionName);
+    }
+  }
+
+  private String validateVersion(TableMetadata tableMetadata, String 
versionFileName) {
+    String versionFile = versionFile(tableMetadata, versionFileName);
+
+    Preconditions.checkNotNull(
+        versionFile, "Version file %s does not exist in metadata log.", 
versionFile);
+    Preconditions.checkArgument(
+        fileExist(versionFile), "Version file %s does not exist.", 
versionFile);
+    return versionFile;
+  }
+
+  private String versionFile(TableMetadata metadata, String versionFileName) {
+    if (versionInFilePath(metadata.metadataFileLocation(), versionFileName)) {
+      return metadata.metadataFileLocation();
+    }
+
+    for (MetadataLogEntry log : metadata.previousFiles()) {
+      if (versionInFilePath(log.file(), versionFileName)) {
+        return log.file();
+      }
+    }
+    return null;
+  }
+
+  private boolean versionInFilePath(String path, String version) {
+    return RewriteTablePathUtil.fileName(path).equals(version);
+  }
+
+  private String jobDesc() {
+    if (startVersionName != null) {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "up to version '%s'.",
+          sourcePrefix, targetPrefix, table.name(), endVersionName);
+    } else {
+      return String.format(
+          "Replacing path prefixes '%s' with '%s' in the metadata files of 
table %s,"
+              + "from version '%s' to '%s'.",
+          sourcePrefix, targetPrefix, table.name(), startVersionName, 
endVersionName);
+    }
+  }
+
+  /**
+   * Rebuild metadata in a staging location, with paths rewritten.
+   *
+   * <ul>
+   *   <li>Rebuild version files to staging
+   *   <li>Rebuild manifest list files to staging
+   *   <li>Rebuild manifest to staging
+   *   <li>Get all files needed to move
+   * </ul>
+   */
+  private String rebuildMetadata() {
+    TableMetadata startMetadata =
+        startVersionName != null
+            ? ((HasTableOperations) newStaticTable(startVersionName, 
table.io()))
+                .operations()
+                .current()
+            : null;
+    TableMetadata endMetadata =
+        ((HasTableOperations) newStaticTable(endVersionName, 
table.io())).operations().current();
+
+    Preconditions.checkArgument(
+        endMetadata.statisticsFiles() == null || 
endMetadata.statisticsFiles().isEmpty(),
+        "Statistic files are not supported yet.");
+
+    // rebuild version files
+    RewriteResult<Snapshot> rewriteVersionResult = 
rewriteVersionFiles(endMetadata);
+    Set<Snapshot> diffSnapshots =
+        getDiffSnapshotIds(startMetadata, rewriteVersionResult.toRewrite());
+
+    Set<String> manifestsToRewrite = manifestsToRewrite(diffSnapshots, 
startMetadata);
+    Set<Snapshot> validSnapshots =
+        Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
+
+    // rebuild manifest-list files
+    RewriteResult<ManifestFile> rewriteManifestListResult =
+        validSnapshots.stream()
+            .map(snapshot -> rewriteManifestList(snapshot, endMetadata, 
manifestsToRewrite))
+            .reduce(new RewriteResult<>(), RewriteResult::append);
+
+    // rebuild manifest files
+    RewriteResult<DeleteFile> rewriteManifestResult =
+        rewriteManifests(endMetadata, rewriteManifestListResult.toRewrite());
+
+    // rebuild position delete files
+    rewritePositionDeletes(endMetadata, rewriteManifestResult.toRewrite());
+
+    Set<Pair<String, String>> copyPlan = Sets.newHashSet();
+    copyPlan.addAll(rewriteVersionResult.copyPlan());
+    copyPlan.addAll(rewriteManifestListResult.copyPlan());
+    copyPlan.addAll(rewriteManifestResult.copyPlan());
+
+    return saveFileList(copyPlan);
+  }
+
+  private String saveFileList(Set<Pair<String, String>> filesToMove) {
+    List<Tuple2<String, String>> fileList =
+        filesToMove.stream()
+            .map(p -> Tuple2.apply(p.first(), p.second()))
+            .collect(Collectors.toList());
+    Dataset<Tuple2<String, String>> fileListDataset =
+        spark().createDataset(fileList, Encoders.tuple(Encoders.STRING(), 
Encoders.STRING()));
+    String fileListPath = stagingDir + RESULT_LOCATION;
+    fileListDataset
+        .repartition(1)
+        .write()
+        .mode(SaveMode.Overwrite)
+        .format("csv")
+        .save(fileListPath);
+    return fileListPath;
+  }
+
+  private Set<Snapshot> getDiffSnapshotIds(
+      TableMetadata startMetadata, Set<Snapshot> allSnapshots) {
+    if (startMetadata == null) {
+      return allSnapshots;
+    } else {
+      Set<Long> startSnapshotIds =
+          
startMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+      return allSnapshots.stream()
+          .filter(s -> !startSnapshotIds.contains(s.snapshotId()))
+          .collect(Collectors.toSet());
+    }
+  }
+
+  private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata 
endMetadata) {
+    RewriteResult<Snapshot> result = new RewriteResult<>();
+    result.toRewrite().addAll(endMetadata.snapshots());
+    result.copyPlan().add(rewriteVersionFile(endMetadata, endVersionName));
+
+    List<MetadataLogEntry> versions = endMetadata.previousFiles();
+    for (int i = versions.size() - 1; i >= 0; i--) {
+      String versionFilePath = versions.get(i).file();
+      if (versionFilePath.equals(startVersionName)) {
+        break;
+      }
+
+      Preconditions.checkArgument(
+          fileExist(versionFilePath),
+          String.format("Version file %s doesn't exist", versionFilePath));
+      TableMetadata tableMetadata =
+          new StaticTableOperations(versionFilePath, table.io()).current();
+
+      result.toRewrite().addAll(tableMetadata.snapshots());
+      result.copyPlan().add(rewriteVersionFile(tableMetadata, 
versionFilePath));
+    }
+
+    return result;
+  }
+
+  private Pair<String, String> rewriteVersionFile(TableMetadata metadata, 
String versionFilePath) {
+    String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, 
stagingDir);
+    TableMetadata newTableMetadata =
+        RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, 
targetPrefix);
+    TableMetadataParser.overwrite(newTableMetadata, 
table.io().newOutputFile(stagingPath));
+    return Pair.of(stagingPath, newPath(versionFilePath, sourcePrefix, 
targetPrefix));
+  }
+
+  /**
+   * Rewrite a manifest list representing a snapshot.
+   *
+   * @param snapshot snapshot represented by the manifest list
+   * @param tableMetadata metadata of table
+   * @param manifestsToRewrite filter of manifests to rewrite.
+   * @return a result including a copy plan for the manifests contained in the 
manifest list, as
+   *     well as for the manifest list itself
+   */
+  private RewriteResult<ManifestFile> rewriteManifestList(
+      Snapshot snapshot, TableMetadata tableMetadata, Set<String> 
manifestsToRewrite) {
+    RewriteResult<ManifestFile> result = new RewriteResult<>();
+
+    String path = snapshot.manifestListLocation();
+    String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
+    RewriteResult<ManifestFile> rewriteResult =
+        RewriteTablePathUtil.rewriteManifestList(
+            snapshot,
+            table.io(),
+            tableMetadata,
+            manifestsToRewrite,
+            sourcePrefix,
+            targetPrefix,
+            stagingDir,
+            outputPath);
+
+    result.append(rewriteResult);
+    // add the manifest list copy plan itself to the result
+    result.copyPlan().add(Pair.of(outputPath, newPath(path, sourcePrefix, 
targetPrefix)));
+    return result;
+  }
+
+  private Set<String> manifestsToRewrite(Set<Snapshot> diffSnapshots, 
TableMetadata startMetadata) {
+    try {
+      Table endStaticTable = newStaticTable(endVersionName, table.io());
+      Dataset<Row> lastVersionFiles = 
manifestDS(endStaticTable).select("path");
+      if (startMetadata == null) {
+        return 
Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList());
+      } else {
+        Set<Long> diffSnapshotIds =
+            
diffSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+        return Sets.newHashSet(
+            lastVersionFiles
+                .distinct()
+                
.filter(functions.column("added_snapshot_id").isInCollection(diffSnapshotIds))
+                .as(Encoders.STRING())
+                .collectAsList());
+      }
+    } catch (Exception e) {
+      throw new UnsupportedOperationException(
+          "Failed to build the manifest files dataframe, the end version you 
are "
+              + "trying to copy may contain invalid snapshots, please a 
younger version that doesn't have invalid "
+              + "snapshots",

Review Comment:
   I think we might miss a verb in second half of the sentence, how about
   `"Unable to build the manifest files dataframe. The end version in use may 
contain invalid snapshots. Please choose an earlier version without invalid 
snapshots"`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to